并发安全问题
问题
Python 多线程/多进程环境下有哪些常见的并发安全问题?如何解决?
答案
竞态条件
cases/race_condition.py
import threading
# ❌ 竞态条件:多线程修改共享变量
counter = 0
def increment():
global counter
for _ in range(100_000):
counter += 1 # 不是原子操作!read → modify → write
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 预期 400000,实际 <400000
# ✅ 修复:使用锁
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(100_000):
with lock:
counter += 1
死锁
cases/deadlock.py
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
# ❌ 死锁:两个线程以不同顺序获取锁
def worker_1():
with lock_a:
with lock_b: # 等待 worker_2 释放 lock_b
...
def worker_2():
with lock_b:
with lock_a: # 等待 worker_1 释放 lock_a → 死锁
...
# ✅ 修复:统一获取顺序
def worker_1():
with lock_a:
with lock_b:
...
def worker_2():
with lock_a: # 和 worker_1 相同顺序
with lock_b:
...
多进程共享数据
cases/multiprocess_shared.py
from multiprocessing import Process, Value, Lock
# ❌ 多进程共享变量需要特殊处理
# 子进程有独立内存,普通变量不共享
# ✅ 使用 multiprocessing.Value
counter = Value("i", 0) # 共享整数
lock = Lock()
def worker(shared_counter, lock):
for _ in range(100_000):
with lock:
shared_counter.value += 1
processes = [Process(target=worker, args=(counter, lock)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(counter.value) # 400000
异步并发安全
cases/async_safety.py
import asyncio
# ❌ asyncio 中的竞态
shared_state = {"count": 0}
async def async_increment():
value = shared_state["count"]
await asyncio.sleep(0) # 让出执行权
shared_state["count"] = value + 1 # 可能覆盖
# ✅ 使用 asyncio.Lock
alock = asyncio.Lock()
async def safe_async_increment():
async with alock:
shared_state["count"] += 1
线程安全的数据结构
solutions/thread_safe.py
import queue
import threading
from collections import deque
# ✅ queue.Queue 是线程安全的
task_queue: queue.Queue = queue.Queue(maxsize=100)
# 生产者
def producer():
for i in range(100):
task_queue.put(i)
# 消费者
def consumer():
while True:
item = task_queue.get()
process(item)
task_queue.task_done()
# ✅ 使用 threading.local() 隔离线程数据
thread_local = threading.local()
def get_connection():
if not hasattr(thread_local, "conn"):
thread_local.conn = create_connection()
return thread_local.conn
常见面试问题
Q1: Python 有 GIL,为什么还会有并发安全问题?
答案:
GIL 只保证同一时刻只有一个线程执行 Python 字节码,但 counter += 1 不是单条字节码,它包含 LOAD → ADD → STORE 多步操作。线程可能在这几步之间切换,导致竞态。
Q2: 线程安全的 Python 内置操作?
答案:
list.append():原子操作,线程安全dict[key] = value:原子操作,线程安全list.sort():原子操作x += 1:不是原子操作,不安全
警告
虽然某些操作在 CPython 中碰巧是原子的,但不应依赖这个实现细节。明确需要并发安全时,始终使用锁。
Q3: 如何选择锁?
答案:
| 锁 | 用途 |
|---|---|
Lock | 基础互斥锁 |
RLock | 可重入锁(同一线程可多次获取) |
Semaphore | 限制并发数 |
Event | 线程间信号通知 |
Condition | 条件变量(生产者-消费者) |