跳到主要内容

多线程

问题

Python 多线程怎么用?有哪些同步原语?线程池如何使用?

答案

创建线程

import threading

def worker(name: str, count: int):
for i in range(count):
print(f"{name}: {i}")

# 方式 1:直接创建
t = threading.Thread(target=worker, args=("Thread-1", 3))
t.start()
t.join() # 等待线程结束

# 方式 2:继承 Thread
class MyThread(threading.Thread):
def run(self):
worker(self.name, 3)

同步原语

Lock(互斥锁)

import threading

counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(100_000):
with lock: # 推荐使用 with 自动释放
counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 500000(没有 lock 则结果不确定)

其他同步原语

原语用途
Lock互斥锁,同一时刻只有一个线程持有
RLock可重入锁,同一线程可多次获取
Event线程间信号通知
Condition条件变量,等待/通知
Semaphore信号量,限制并发数
Barrier屏障,等待所有线程到达
# Event 示例:等待信号
event = threading.Event()

def waiter():
print("等待信号...")
event.wait() # 阻塞直到 event.set()
print("收到信号!")

def sender():
import time
time.sleep(1)
event.set() # 发送信号

threading.Thread(target=waiter).start()
threading.Thread(target=sender).start()

线程池

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch(url: str) -> str:
return requests.get(url).text[:100]

urls = [f"https://example.com/{i}" for i in range(20)]

with ThreadPoolExecutor(max_workers=5) as executor:
# map:保序返回
results = list(executor.map(fetch, urls))

# submit:返回 Future,更灵活
futures = [executor.submit(fetch, url) for url in urls]
for future in futures:
print(future.result())

线程安全的数据结构

import queue

# queue.Queue — 线程安全的队列
q = queue.Queue(maxsize=10)
q.put("item") # 阻塞式放入
item = q.get() # 阻塞式取出
q.task_done() # 标记任务完成

常见面试问题

Q1: 守护线程是什么?

答案

守护线程(daemon thread)在主线程退出时自动终止:

t = threading.Thread(target=worker, daemon=True)
t.start()
# 主线程结束后,守护线程自动被杀死

Q2: 死锁是什么?如何避免?

答案

死锁是两个线程互相等待对方持有的锁:

# ❌ 死锁示例
lock_a, lock_b = threading.Lock(), threading.Lock()

def thread_1():
with lock_a:
with lock_b: pass # 等待 lock_b

def thread_2():
with lock_b:
with lock_a: pass # 等待 lock_a

# ✅ 避免方案:固定锁的获取顺序
def safe_thread():
with lock_a: # 总是先获取 lock_a
with lock_b: pass

Q3: threading.local() 的作用?

答案

threading.local() 为每个线程提供独立的变量副本:

local_data = threading.local()

def worker(name):
local_data.name = name # 每个线程有自己的 name
print(local_data.name)

# 常见应用:数据库连接、Flask 的请求上下文

相关链接