观察者模式
问题
Python 中如何实现观察者模式(发布-订阅)?
答案
基础实现
from typing import Callable, Any
class EventEmitter:
def __init__(self):
self._listeners: dict[str, list[Callable]] = {}
def on(self, event: str, callback: Callable):
self._listeners.setdefault(event, []).append(callback)
def off(self, event: str, callback: Callable):
if event in self._listeners:
self._listeners[event].remove(callback)
def emit(self, event: str, *args: Any, **kwargs: Any):
for callback in self._listeners.get(event, []):
callback(*args, **kwargs)
# 使用
emitter = EventEmitter()
def on_user_created(user):
print(f"Send welcome email to {user['email']}")
def on_user_created_log(user):
print(f"Log: user {user['name']} created")
emitter.on("user_created", on_user_created)
emitter.on("user_created", on_user_created_log)
emitter.emit("user_created", {"name": "Alice", "email": "alice@example.com"})
装饰器风格
class EventBus:
_handlers: dict[str, list[Callable]] = {}
@classmethod
def subscribe(cls, event: str):
def decorator(func):
cls._handlers.setdefault(event, []).append(func)
return func
return decorator
@classmethod
def publish(cls, event: str, *args, **kwargs):
for handler in cls._handlers.get(event, []):
handler(*args, **kwargs)
@EventBus.subscribe("order_paid")
def send_receipt(order):
print(f"Send receipt for order {order['id']}")
@EventBus.subscribe("order_paid")
def update_inventory(order):
print(f"Update inventory for order {order['id']}")
EventBus.publish("order_paid", {"id": 123, "total": 99.9})
Django Signals
Django 内置了观察者模式:
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def create_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
常见面试问题
Q1: 观察者模式的优缺点?
答案:
- 优点:解耦发布者和订阅者,便于扩展
- 缺点:调试困难(事件链不直观)、可能导致循环触发、内存泄漏(忘记取消订阅)
Q2: 观察者模式和发布-订阅的区别?
答案:
- 观察者模式:Subject 直接通知 Observer,两者互相知道
- 发布-订阅:通过中间的 EventBus/Broker 解耦,发布者和订阅者互不知道
Python 中两者界限模糊,Django Signals 更接近发布-订阅。