跳到主要内容

并发安全问题

问题

Python 多线程/多进程环境下有哪些常见的并发安全问题?如何解决?

答案

竞态条件

cases/race_condition.py
import threading

# ❌ 竞态条件:多线程修改共享变量
counter = 0

def increment():
global counter
for _ in range(100_000):
counter += 1 # 不是原子操作!read → modify → write

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 预期 400000,实际 <400000

# ✅ 修复:使用锁
lock = threading.Lock()

def safe_increment():
global counter
for _ in range(100_000):
with lock:
counter += 1

死锁

cases/deadlock.py
import threading

lock_a = threading.Lock()
lock_b = threading.Lock()

# ❌ 死锁:两个线程以不同顺序获取锁
def worker_1():
with lock_a:
with lock_b: # 等待 worker_2 释放 lock_b
...

def worker_2():
with lock_b:
with lock_a: # 等待 worker_1 释放 lock_a → 死锁
...

# ✅ 修复:统一获取顺序
def worker_1():
with lock_a:
with lock_b:
...

def worker_2():
with lock_a: # 和 worker_1 相同顺序
with lock_b:
...

多进程共享数据

cases/multiprocess_shared.py
from multiprocessing import Process, Value, Lock

# ❌ 多进程共享变量需要特殊处理
# 子进程有独立内存,普通变量不共享

# ✅ 使用 multiprocessing.Value
counter = Value("i", 0) # 共享整数
lock = Lock()

def worker(shared_counter, lock):
for _ in range(100_000):
with lock:
shared_counter.value += 1

processes = [Process(target=worker, args=(counter, lock)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(counter.value) # 400000

异步并发安全

cases/async_safety.py
import asyncio

# ❌ asyncio 中的竞态
shared_state = {"count": 0}

async def async_increment():
value = shared_state["count"]
await asyncio.sleep(0) # 让出执行权
shared_state["count"] = value + 1 # 可能覆盖

# ✅ 使用 asyncio.Lock
alock = asyncio.Lock()

async def safe_async_increment():
async with alock:
shared_state["count"] += 1

线程安全的数据结构

solutions/thread_safe.py
import queue
import threading
from collections import deque

# ✅ queue.Queue 是线程安全的
task_queue: queue.Queue = queue.Queue(maxsize=100)

# 生产者
def producer():
for i in range(100):
task_queue.put(i)

# 消费者
def consumer():
while True:
item = task_queue.get()
process(item)
task_queue.task_done()

# ✅ 使用 threading.local() 隔离线程数据
thread_local = threading.local()

def get_connection():
if not hasattr(thread_local, "conn"):
thread_local.conn = create_connection()
return thread_local.conn

常见面试问题

Q1: Python 有 GIL,为什么还会有并发安全问题?

答案

GIL 只保证同一时刻只有一个线程执行 Python 字节码,但 counter += 1 不是单条字节码,它包含 LOAD → ADD → STORE 多步操作。线程可能在这几步之间切换,导致竞态。

Q2: 线程安全的 Python 内置操作?

答案

  • list.append():原子操作,线程安全
  • dict[key] = value:原子操作,线程安全
  • list.sort():原子操作
  • x += 1不是原子操作,不安全
警告

虽然某些操作在 CPython 中碰巧是原子的,但不应依赖这个实现细节。明确需要并发安全时,始终使用锁。

Q3: 如何选择锁?

答案

用途
Lock基础互斥锁
RLock可重入锁(同一线程可多次获取)
Semaphore限制并发数
Event线程间信号通知
Condition条件变量(生产者-消费者)

相关链接