gRPC
问题
Python 中如何使用 gRPC?Protobuf 定义和流式通信怎么实现?
答案
Proto 文件定义
user.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc ListUsers (ListUsersRequest) returns (stream UserResponse); // 服务端流
rpc CreateUsers (stream CreateUserRequest) returns (BatchResult); // 客户端流
}
message GetUserRequest {
int32 id = 1;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
# 生成 Python 代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto
服务端实现
import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc
class UserServicer(user_pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await fetch_user(request.id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
return user_pb2.UserResponse(id=user.id, name=user.name, email=user.email)
async def ListUsers(self, request, context):
"""服务端流:逐条返回"""
users = await fetch_users(page=request.page)
for user in users:
yield user_pb2.UserResponse(id=user.id, name=user.name)
async def serve():
server = grpc.aio.server()
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
客户端调用
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = user_pb2_grpc.UserServiceStub(channel)
# 一元调用
response = await stub.GetUser(user_pb2.GetUserRequest(id=1))
print(response.name)
# 服务端流
async for user in stub.ListUsers(user_pb2.ListUsersRequest(page=1)):
print(user.name)
常见面试问题
Q1: gRPC 和 REST 的区别?
答案:
| 特性 | gRPC | REST |
|---|---|---|
| 协议 | HTTP/2 | HTTP/1.1 或 2 |
| 序列化 | Protobuf(二进制) | JSON(文本) |
| 性能 | 高 | 中 |
| 流式通信 | 原生支持 | 需 WebSocket/SSE |
| 浏览器支持 | 需 gRPC-Web | 原生 |
| 代码生成 | 强制 | 可选(OpenAPI) |
Q2: gRPC 的四种通信模式?
答案:
- 一元 RPC:客户端发一个请求,服务端返回一个响应
- 服务端流:客户端发一个请求,服务端返回流式响应
- 客户端流:客户端流式发送,服务端返回一个响应
- 双向流:客户端和服务端同时流式通信
Q3: 如何做 gRPC 拦截器(中间件)?
答案:
# 服务端拦截器
class LoggingInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
print(f"gRPC call: {method}")
return await continuation(handler_call_details)
server = grpc.aio.server(interceptors=[LoggingInterceptor()])