跳到主要内容

gRPC

问题

Python 中如何使用 gRPC?Protobuf 定义和流式通信怎么实现?

答案

Proto 文件定义

user.proto
syntax = "proto3";

package user;

service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc ListUsers (ListUsersRequest) returns (stream UserResponse); // 服务端流
rpc CreateUsers (stream CreateUserRequest) returns (BatchResult); // 客户端流
}

message GetUserRequest {
int32 id = 1;
}

message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
# 生成 Python 代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto

服务端实现

import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc

class UserServicer(user_pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await fetch_user(request.id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
return user_pb2.UserResponse(id=user.id, name=user.name, email=user.email)

async def ListUsers(self, request, context):
"""服务端流:逐条返回"""
users = await fetch_users(page=request.page)
for user in users:
yield user_pb2.UserResponse(id=user.id, name=user.name)

async def serve():
server = grpc.aio.server()
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()

客户端调用

async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = user_pb2_grpc.UserServiceStub(channel)

# 一元调用
response = await stub.GetUser(user_pb2.GetUserRequest(id=1))
print(response.name)

# 服务端流
async for user in stub.ListUsers(user_pb2.ListUsersRequest(page=1)):
print(user.name)

常见面试问题

Q1: gRPC 和 REST 的区别?

答案

特性gRPCREST
协议HTTP/2HTTP/1.1 或 2
序列化Protobuf(二进制)JSON(文本)
性能
流式通信原生支持需 WebSocket/SSE
浏览器支持需 gRPC-Web原生
代码生成强制可选(OpenAPI)

Q2: gRPC 的四种通信模式?

答案

  1. 一元 RPC:客户端发一个请求,服务端返回一个响应
  2. 服务端流:客户端发一个请求,服务端返回流式响应
  3. 客户端流:客户端流式发送,服务端返回一个响应
  4. 双向流:客户端和服务端同时流式通信

Q3: 如何做 gRPC 拦截器(中间件)?

答案

# 服务端拦截器
class LoggingInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
print(f"gRPC call: {method}")
return await continuation(handler_call_details)

server = grpc.aio.server(interceptors=[LoggingInterceptor()])

相关链接