跳到主要内容

观察者模式

问题

Python 中如何实现观察者模式(发布-订阅)?

答案

基础实现

from typing import Callable, Any

class EventEmitter:
def __init__(self):
self._listeners: dict[str, list[Callable]] = {}

def on(self, event: str, callback: Callable):
self._listeners.setdefault(event, []).append(callback)

def off(self, event: str, callback: Callable):
if event in self._listeners:
self._listeners[event].remove(callback)

def emit(self, event: str, *args: Any, **kwargs: Any):
for callback in self._listeners.get(event, []):
callback(*args, **kwargs)

# 使用
emitter = EventEmitter()

def on_user_created(user):
print(f"Send welcome email to {user['email']}")

def on_user_created_log(user):
print(f"Log: user {user['name']} created")

emitter.on("user_created", on_user_created)
emitter.on("user_created", on_user_created_log)
emitter.emit("user_created", {"name": "Alice", "email": "alice@example.com"})

装饰器风格

class EventBus:
_handlers: dict[str, list[Callable]] = {}

@classmethod
def subscribe(cls, event: str):
def decorator(func):
cls._handlers.setdefault(event, []).append(func)
return func
return decorator

@classmethod
def publish(cls, event: str, *args, **kwargs):
for handler in cls._handlers.get(event, []):
handler(*args, **kwargs)

@EventBus.subscribe("order_paid")
def send_receipt(order):
print(f"Send receipt for order {order['id']}")

@EventBus.subscribe("order_paid")
def update_inventory(order):
print(f"Update inventory for order {order['id']}")

EventBus.publish("order_paid", {"id": 123, "total": 99.9})

Django Signals

Django 内置了观察者模式:

from django.db.models.signals import post_save
from django.dispatch import receiver

@receiver(post_save, sender=User)
def create_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)

常见面试问题

Q1: 观察者模式的优缺点?

答案

  • 优点:解耦发布者和订阅者,便于扩展
  • 缺点:调试困难(事件链不直观)、可能导致循环触发、内存泄漏(忘记取消订阅)

Q2: 观察者模式和发布-订阅的区别?

答案

  • 观察者模式:Subject 直接通知 Observer,两者互相知道
  • 发布-订阅:通过中间的 EventBus/Broker 解耦,发布者和订阅者互不知道

Python 中两者界限模糊,Django Signals 更接近发布-订阅。

相关链接