Python gRPC 从入门到进阶:构建高效微服务通信的全面教程
原文地址: https://88box.top 生成时间: 2026-05-20 01:01:41
Python gRPC 从入门到进阶:高效微服务通信的完整指南 - hey99 知识搜索引擎
精选文章
Python gRPC 从入门到进阶:高效微服务通信的完整指南
摘要 本文介绍了gRPC在微服务架构中的应用,重点讲解了Python环境下gRPC的开发实践。主要内容包括: 环境搭建:安装gRPC核心库和编译工具,验证版本信息。 入门实战:通过定义.proto文件、编译生成Python代码、实现服务端和客户端,构建了一个简单的问候服务示例。 通信模式:详细解析了gRPC支持的四种通信模式,包括一元RPC、服务端流式、客户端流式和双向流式。 文章提供了可直接运行的代码示例,适合新手入门和进阶开发者参考,帮助读者掌握gRPC的核心概念和实际应用技巧。
更新于 2026-05-19 16:47
作者:IT策士
10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在公众号、今日头条持续发布最新文章,助你少走弯路。
前言
在微服务架构中,服务间的通信效率直接影响着整个系统的性能。想象一下,如果你的电商订单系统每秒需要调用上千次库存服务,那种传统的 JSON 文本传输会带来多大的性能损耗?这正是 gRPC 要解决的核心问题。
gRPC 由 Google 开发,是一个高性能、开源的远程过程调用框架。与传统的 REST API 不同,gRPC 基于 HTTP/2 协议,使用 Protocol Buffers 作为序列化格式,将数据体积压缩 3-10 倍,同时支持双向流通信、连接多路复用等高级特性
-42
。Netflix、Square、Google 等顶级公司已大规模用 gRPC 替代 REST,用于内部微服务通信
-42
。
本文将带你从零开始掌握 Python 中的 gRPC 开发技术,从环境搭建到生产级实践,每一个示例都可以直接运行,适合新手入门和进阶开发者参考。
一、环境准备:从零搭建 gRPC 开发环境
首先安装核心依赖:
安装 gRPC 核心库和编译工具
pip
install
grpcio grpcio-tools protobuf
安装完成后,验证版本:
python
-c
"import grpc; print(f'gRPC 版本:{grpc.version}')"
控制台输出:
新手常见坑
:
import grpc
时不能写成
import grpcio
。Python 中没有叫
grpc
的官方包,
grpcio
安装后暴露的顶层模块名是
grpc
,这个细节经常让新手困惑-。
二、入门实战:构建第一个 gRPC 服务
2.1 定义 .proto 文件
gRPC 的核心思想是
契约优先
——先定义好服务接口,再由框架自动生成代码。接口定义使用 Protocol Buffers 语法。
// hello.proto
syntax
=
"proto3"
;
package hello
;
// 定义问候服务
service
Greeter
{
// 一元 RPC:客户端发送一个请求,服务端返回一个响应
rpc SayHello
(
HelloRequest
)
returns
(
HelloReply
)
;
}
// 请求消息
message HelloRequest
{
string name
=
1
;
}
// 响应消息
message HelloReply
{
string message
=
1
;
}
语法要点解析
:
syntax = "proto3"
:使用 Proto3 语法,这是目前推荐的标准版本
-38
message
:定义数据结构,每个字段后有唯一的编号(tag),用于二进制编码时识别字段
-38
service
:定义 RPC 方法签名,指定输入输出类型
-39
字段编号从 1 开始递增,
不可重复
,建议预留部分编号便于后续扩展
2.2 编译生成 Python 代码
使用
protoc
编译器将
.proto
文件编译为 Python 代码:
python
-m
grpc_tools.protoc -I.
--python_out
=
.
--grpc_python_out
=
. hello.proto
控制台输出
(成功时无报错):
这个命令会在当前目录下生成两个文件
-7
:
hello_pb2.py
:包含消息类的序列化/反序列化代码
hello_pb2_grpc.py
:包含服务基类(Servicer)和客户端桩代码(Stub)
组件分工表
:
2.3 实现服务端
服务端的核心任务是
实现 .proto 中定义的服务接口并启动 gRPC 服务器
-7
。
server.py
import
grpc
from concurrent
import
futures
import
hello_pb2
import
hello_pb2_grpc
class GreeterServicer
(
hello_pb2_grpc.GreeterServicer
)
:
""
"实现 .proto 中定义的 Greeter 服务
""
"
def SayHello
(
self, request, context
)
:
request.name 来自客户端发送的 HelloRequest
(
f
"[服务端] 收到请求,name={request.name}"
)
return
hello_pb2.HelloReply
(
message
=
f
"你好,{request.name}!这里是 gRPC 服务端 🎉"
)
def serve
(
)
:
创建 gRPC 服务器,最大工作线程数 10
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
))
将服务实现注册到服务器
hello_pb2_grpc.add_GreeterServicer_to_server
(
GreeterServicer
(
)
, server
)
监听端口(insecure 表示不使用 TLS,仅供开发测试)
server.add_insecure_port
(
'[::]:50051'
)
(
"[服务端] gRPC 服务已启动,监听端口 50051..."
)
server.start
(
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
2.4 实现客户端
客户端的任务是
通过 Stub(桩)调用远程服务方法
,就像调用本地函数一样自然
-7
。
client.py
import
grpc
import
hello_pb2
import
hello_pb2_grpc
def run
(
)
:
建立与服务器的连接(insecure 开发模式)
with grpc.insecure_channel
(
'localhost:50051'
)
as channel:
创建客户端桩(Stub)
stub
=
hello_pb2_grpc.GreeterStub
(
channel
)
构造请求并调用远程方法
response
=
stub.SayHello
(
hello_pb2.HelloRequest
(
name
=
"小明"
))
(
f
"[客户端] 收到响应: {response.message}"
)
if
name
==
'main'
:
run
(
)
2.5 运行测试
打开
两个终端
,分别运行服务端和客户端。
终端 1 — 服务端:
终端 1 输出:
[
服务端
]
gRPC 服务已启动,监听端口
50051
..
.
[
服务端
]
收到请求,name
=
小明
终端 2 — 客户端:
终端 2 输出:
[
客户端
]
收到响应: 你好,小明!这里是 gRPC 服务端 🎉
恭喜!你已经成功构建了第一个 gRPC 服务 🎉。
三、深入理解:四种通信模式全解析
gRPC 基于 HTTP/2,支持四种通信模式
-16
。下面逐一实战演示。
3.1 定义流式 .proto 文件
// streaming.proto
syntax
=
"proto3"
;
package streaming
;
service
DataService
{
//
1
. 一元 RPC:一问一答(上面已讲)
rpc UnaryCall
(
DataRequest
)
returns
(
DataResponse
)
;
//
2
. 服务端流式:客户端发一次,服务端持续推送
rpc ServerStreaming
(
DataRequest
)
returns
(
stream DataResponse
)
;
//
3
. 客户端流式:客户端持续发送,服务端汇总一次
rpc ClientStreaming
(
stream DataRequest
)
returns
(
DataResponse
)
;
//
4
. 双向流式:双方都可随时发送
rpc BidirectionalStreaming
(
stream DataRequest
)
returns
(
stream DataResponse
)
;
}
message DataRequest
{
string message
=
1
;
int32 sequence
=
2
;
// 序号,用于追踪数据流顺序
}
message DataResponse
{
string message
=
1
;
int32 sequence
=
2
;
}
语法要点
:
stream
关键字是流式模式的核心声明。出现在返回类型前表示服务端流式,出现在参数类型前表示客户端流式,两边都有则是双向流式
-19
。
首先编译生成代码:
python
-m
grpc_tools.protoc -I.
--python_out
=
.
--grpc_python_out
=
. streaming.proto
3.2 模式一:服务端流式(Server Streaming)
适用场景
:股票实时行情推送、日志监控、大文件下载。服务端逐个发送数据块,客户端逐个读取,内存占用极低
-19
。
server_streaming_server.py
import
grpc
import
time
from concurrent
import
futures
import
streaming_pb2
import
streaming_pb2_grpc
class DataServiceServicer
(
streaming_pb2_grpc.DataServiceServicer
)
:
def ServerStreaming
(
self, request, context
)
:
""
"服务端流式:收到一个请求后,持续推送多条数据
""
"
(
f
"[服务端] 收到流式请求: {request.message},开始推送数据..."
)
for
i
in
range
(
5
)
:
response
=
streaming_pb2.DataResponse
(
message
=
f
"服务端推送第 {i+1} 条数据"
,
sequence
=
i +
1
)
(
f
"[服务端] 推送 #{i+1}"
)
yield response
time.sleep
(
0.5
)
模拟数据生成延迟
(
"[服务端] 推送完成"
)
def serve
(
)
:
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
))
streaming_pb2_grpc.add_DataServiceServicer_to_server
(
DataServiceServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50052'
)
server.start
(
)
(
"[服务端流式] 服务已启动,监听端口 50052..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
server_streaming_client.py
import
grpc
import
streaming_pb2
import
streaming_pb2_grpc
def run
(
)
:
with grpc.insecure_channel
(
'localhost:50052'
)
as channel:
stub
=
streaming_pb2_grpc.DataServiceStub
(
channel
)
request
=
streaming_pb2.DataRequest
(
message
=
"请推送数据"
)
(
"[客户端] 发送请求,等待服务端推送..."
)
responses 是一个迭代器,逐个接收服务端推送的数据
responses
=
stub.ServerStreaming
(
request
)
for
resp
in
responses:
(
f
"[客户端] 收到 #{resp.sequence}: {resp.message}"
)
if
name
==
'main'
:
run
(
)
控制台输出 — 服务端:
[
服务端流式
]
服务已启动,监听端口
50052
..
.
[
服务端
]
收到流式请求: 请推送数据,开始推送数据
..
.
[
服务端
]
推送
1
[
服务端
]
推送
2
[
服务端
]
推送
3
[
服务端
]
推送
4
[
服务端
]
推送
5
[
服务端
]
推送完成
控制台输出 — 客户端:
[
客户端
]
发送请求,等待服务端推送
..
.
[
客户端
]
收到
1: 服务端推送第 1 条数据
[
客户端
]
收到
2: 服务端推送第 2 条数据
[
客户端
]
收到
3: 服务端推送第 3 条数据
[
客户端
]
收到
4: 服务端推送第 4 条数据
[
客户端
]
收到
5: 服务端推送第 5 条数据
核心要点
:服务端使用
yield
逐个生成响应,客户端通过
for-in
循环逐条接收,无需等待全部数据到达。
3.3 模式二:客户端流式(Client Streaming)
适用场景
:物联网设备批量上报传感器数据、日志采集、大文件上传
-19
。
client_streaming_server.py
import
grpc
from concurrent
import
futures
import
streaming_pb2
import
streaming_pb2_grpc
class DataServiceServicer
(
streaming_pb2_grpc.DataServiceServicer
)
:
def ClientStreaming
(
self, request_iterator, context
)
:
""
"客户端流式:接收客户端持续发送的多条数据,汇总后返回一条结果
""
"
total
=
0
messages
=
[
]
for
request
in
request_iterator:
total
+=
1
messages.append
(
request.message
)
(
f
"[服务端] 收到 #{request.sequence}: {request.message}"
)
汇总返回
result
=
streaming_pb2.DataResponse
(
message
=
f
"共收到 {total} 条数据: {'; '.join(messages)}"
,
sequence
=
total
)
(
f
"[服务端] 汇总完成,返回结果"
)
return
result
def serve
(
)
:
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
))
streaming_pb2_grpc.add_DataServiceServicer_to_server
(
DataServiceServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50053'
)
server.start
(
)
(
"[客户端流式] 服务已启动,监听端口 50053..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
client_streaming_client.py
import
grpc
import
time
import
streaming_pb2
import
streaming_pb2_grpc
def generate_requests
(
)
:
""
"生成器函数:逐个产生要发送的请求
""
"
data_list
=
[
"传感器-温度:25.3°C"
,
"传感器-湿度:68%"
,
"传感器-气压:1013hPa"
]
for
i, data
in
enumerate
(
data_list
)
:
yield streaming_pb2.DataRequest
(
message
=
data,
sequence
=
i +
1
)
time.sleep
(
0.3
)
模拟采集间隔
(
f
" [生成器] 已产生第 {i+1} 条数据"
)
def run
(
)
:
with grpc.insecure_channel
(
'localhost:50053'
)
as channel:
stub
=
streaming_pb2_grpc.DataServiceStub
(
channel
)
response
=
stub.ClientStreaming
(
generate_requests
(
))
(
f
"
\n
[客户端] 收到汇总响应: {response.message}"
)
if
name
==
'main'
:
run
(
)
控制台输出 — 服务端:
[
客户端流式
]
服务已启动,监听端口
50053
..
.
[
服务端
]
收到
1: 传感器-温度:25.3°C
[
服务端
]
收到
2: 传感器-湿度:68%
[
服务端
]
收到
3: 传感器-气压:1013hPa
[
服务端
]
汇总完成,返回结果
控制台输出 — 客户端:
[
生成器
]
已产生第
1
条数据
[
生成器
]
已产生第
2
条数据
[
生成器
]
已产生第
3
条数据
[
客户端
]
收到汇总响应: 共收到
3
条数据: 传感器-温度:25.3°C
;
传感器-湿度:68%
;
传感器-气压:1013hPa
核心要点
:客户端用
yield
生成器逐个发送请求,服务端通过迭代
request_iterator
逐个接收,汇总后一次性返回结果。
3.4 模式三:双向流式(Bidirectional Streaming)
适用场景
:实时聊天、语音识别、在线游戏同步——双方可以随时、独立地发送数据
-19
。
bidirectional_server.py
import
grpc
import
time
from concurrent
import
futures
import
streaming_pb2
import
streaming_pb2_grpc
class DataServiceServicer
(
streaming_pb2_grpc.DataServiceServicer
)
:
def BidirectionalStreaming
(
self, request_iterator, context
)
:
""
"双向流式:一边接收客户端消息,一边主动推送响应
""
"
(
"[服务端] 双向流通信已建立,等待客户端消息..."
)
for
request
in
request_iterator:
(
f
"[服务端] 收到 #{request.sequence}: {request.message}"
)
收到一条消息后,立即回复两条
yield streaming_pb2.DataResponse
(
message
=
f
"回声: {request.message}"
,
sequence
=
request.sequence *
100
+
1
)
yield streaming_pb2.DataResponse
(
message
=
f
"确认收到第 {request.sequence} 条消息"
,
sequence
=
request.sequence *
100
+
2
)
def serve
(
)
:
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
))
streaming_pb2_grpc.add_DataServiceServicer_to_server
(
DataServiceServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50054'
)
server.start
(
)
(
"[双向流式] 服务已启动,监听端口 50054..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
bidirectional_client.py
import
grpc
import
time
import
threading
import
streaming_pb2
import
streaming_pb2_grpc
def run
(
)
:
with grpc.insecure_channel
(
'localhost:50054'
)
as channel:
stub
=
streaming_pb2_grpc.DataServiceStub
(
channel
)
def generate_messages
(
)
:
""
"在另一个线程中发送消息
""
"
messages
=
[
"你好服务端!"
,
"这是第二条消息"
,
"再见!"
]
for
i, msg
in
enumerate
(
messages
)
:
yield streaming_pb2.DataRequest
(
message
=
msg,
sequence
=
i +
1
)
time.sleep
(
1
)
(
f
" [发送线程] 已发送第 {i+1} 条: {msg}"
)
调用双向流 RPC
responses
=
stub.BidirectionalStreaming
(
generate_messages
(
))
(
"[客户端] 开始接收服务端响应..."
)
for
resp
in
responses:
(
f
"[客户端] 收到 #{resp.sequence}: {resp.message}"
)
if
name
==
'main'
:
run
(
)
控制台输出 — 服务端:
[
双向流式
]
服务已启动,监听端口
50054
..
.
[
服务端
]
双向流通信已建立,等待客户端消息
..
.
[
服务端
]
收到
1: 你好服务端!
[
服务端
]
收到
2: 这是第二条消息
[
服务端
]
收到
3: 再见!
控制台输出 — 客户端:
[
客户端
]
开始接收服务端响应
..
.
[
客户端
]
收到
101: 回声: 你好服务端!
[
客户端
]
收到
102: 确认收到第 1 条消息
[
发送线程
]
已发送第
1
条: 你好服务端!
[
客户端
]
收到
201: 回声: 这是第二条消息
[
客户端
]
收到
202: 确认收到第 2 条消息
[
发送线程
]
已发送第
2
条: 这是第二条消息
[
客户端
]
收到
301: 回声: 再见!
[
客户端
]
收到
302: 确认收到第 3 条消息
[
发送线程
]
已发送第
3
条: 再见!
核心要点
:双向流式模式下,客户端和服务端可以
同时
发送和接收数据,实现了真正的全双工通信。收到一条消息后,可以回复多条(本例中回复了 2 条)。
3.5 四种模式对比总结
四、进阶应用:生产级 gRPC 服务的关键技术
前面已经掌握了 gRPC 的基本用法,但要构建生产级服务,还需要掌握以下几个关键技术。
4.1 拦截器:横切逻辑的优雅实现
拦截器是 gRPC 的"中间件"机制,可以在不修改业务代码的情况下,统一处理日志、认证、监控等横切关注点-。
interceptor_server.py
import
grpc
from concurrent
import
futures
import
time
import
hello_pb2
import
hello_pb2_grpc
class LoggingInterceptor
(
grpc.ServerInterceptor
)
:
""
"自定义日志拦截器:记录每次 RPC 调用的耗时
""
"
def intercept_service
(
self, continuation, handler_call_details
)
:
start_time
=
time.time
(
)
调用实际的 RPC 处理方法
response
=
continuation
(
handler_call_details
)
elapsed
=
(
time.time
(
)
- start_time
)
*
1000
method
=
handler_call_details.method
(
f
"[拦截器] {method} 调用完成,耗时 {elapsed:.2f}ms"
)
return
response
class GreeterServicer
(
hello_pb2_grpc.GreeterServicer
)
:
def SayHello
(
self, request, context
)
:
return
hello_pb2.HelloReply
(
message
=
f
"你好,{request.name}!"
)
def serve
(
)
:
在创建服务器时传入拦截器
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
)
,
interceptors
=
[
LoggingInterceptor
(
)
]
注入拦截器
)
hello_pb2_grpc.add_GreeterServicer_to_server
(
GreeterServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50055'
)
server.start
(
)
(
"[拦截器示例] 服务已启动..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
客户端(使用之前的
client.py
修改端口为 50055):
interceptor_client.py
import
grpc
import
hello_pb2
import
hello_pb2_grpc
def run
(
)
:
with grpc.insecure_channel
(
'localhost:50055'
)
as channel:
stub
=
hello_pb2_grpc.GreeterStub
(
channel
)
response
=
stub.SayHello
(
hello_pb2.HelloRequest
(
name
=
"小明"
))
(
f
"[客户端] 收到: {response.message}"
)
if
name
==
'main'
:
run
(
)
控制台输出 — 服务端:
[
拦截器示例
]
服务已启动
..
.
[
拦截器
]
/hello.Greeter/SayHello 调用完成,耗时
0
.03ms
4.2 错误处理与状态码
gRPC 提供了丰富的状态码体系,远比 HTTP 状态码更精确
-58
。不应一律返回
INTERNAL
错误,而应使用语义明确的状态码。
error_handling_server.py
import
grpc
from concurrent
import
futures
import
hello_pb2
import
hello_pb2_grpc
class GreeterServicer
(
hello_pb2_grpc.GreeterServicer
)
:
def SayHello
(
self, request, context
)
:
参数校验
if
not request.name:
context.set_code
(
grpc.StatusCode.INVALID_ARGUMENT
)
context.set_details
(
'name 参数不能为空'
)
return
hello_pb2.HelloReply
(
)
模拟权限检查
if
request.name
==
"admin"
:
context.set_code
(
grpc.StatusCode.PERMISSION_DENIED
)
context.set_details
(
'不允许使用 admin 身份'
)
return
hello_pb2.HelloReply
(
)
模拟资源不存在
if
request.name
==
"notfound"
:
context.abort
(
grpc.StatusCode.NOT_FOUND,
'该用户不存在'
)
return
hello_pb2.HelloReply
(
message
=
f
"你好,{request.name}!调用成功"
)
def serve
(
)
:
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
))
hello_pb2_grpc.add_GreeterServicer_to_server
(
GreeterServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50056'
)
server.start
(
)
(
"[错误处理示例] 服务已启动..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
error_handling_client.py
import
grpc
import
hello_pb2
import
hello_pb2_grpc
def call_with_name
(
stub, name
)
:
""
"封装调用逻辑,统一处理 RPC 错误
""
"
try:
response
=
stub.SayHello
(
hello_pb2.HelloRequest
(
name
=
name
)
,
timeout
=
2.0
设置 2 秒超时
)
(
f
"[客户端] name='{name}' -> {response.message}"
)
except grpc.RpcError as e:
(
f
"[客户端] name='{name}' -> "
f
"状态码: {e.code()}, 详情: {e.details()}"
)
def run
(
)
:
with grpc.insecure_channel
(
'localhost:50056'
)
as channel:
stub
=
hello_pb2_grpc.GreeterStub
(
channel
)
call_with_name
(
stub,
"小明"
)
正常
call_with_name
(
stub,
""
)
参数为空
call_with_name
(
stub,
"admin"
)
权限不足
call_with_name
(
stub,
"notfound"
)
资源不存在
if
name
==
'main'
:
run
(
)
控制台输出 — 客户端:
[
客户端
]
name
=
'小明'
-
你好,小明!调用成功
[
客户端
]
name
=
''
-
状态码: StatusCode.INVALID_ARGUMENT, 详情: name 参数不能为空
[
客户端
]
name
=
'admin'
-
状态码: StatusCode.PERMISSION_DENIED, 详情: 不允许使用 admin 身份
[
客户端
]
name
=
'notfound'
-
状态码: StatusCode.NOT_FOUND, 详情: 该用户不存在
常用状态码速查
:
4.3 HTTP/2 连接复用(Keep-Alive)与超时配置
生产环境中,长连接的管理直接影响资源利用率和故障恢复速度。
keepalive_server.py
import
grpc
from concurrent
import
futures
import
hello_pb2
import
hello_pb2_grpc
class GreeterServicer
(
hello_pb2_grpc.GreeterServicer
)
:
def SayHello
(
self, request, context
)
:
获取请求超时时间
deadline
=
context.time_remaining
(
)
if
deadline:
(
f
"[服务端] 距离超时还有 {deadline:.2f} 秒"
)
return
hello_pb2.HelloReply
(
message
=
f
"你好,{request.name}!"
)
def serve
(
)
:
server
=
grpc.server
(
futures.ThreadPoolExecutor
(
max_workers
=
10
)
,
options
=
[
HTTP/2 Keep-Alive:定期发送 PING 帧保持连接
(
'grpc.keepalive_time_ms'
,
10000
)
,
每 10 秒发送 PING
(
'grpc.keepalive_timeout_ms'
,
5000
)
,
PING 超时 5 秒则断开
(
'grpc.keepalive_permit_without_calls'
, True
)
,
(
'grpc.http2.max_pings_without_data'
,
0
)
,
]
)
hello_pb2_grpc.add_GreeterServicer_to_server
(
GreeterServicer
(
)
, server
)
server.add_insecure_port
(
'[::]:50057'
)
server.start
(
)
(
"[Keep-Alive 示例] 服务已启动..."
)
server.wait_for_termination
(
)
if
name
==
'main'
:
serve
(
)
keepalive_client.py
import
grpc
import
hello_pb2
import
hello_pb2_grpc
def run
(
)
:
配置客户端侧 HTTP/2 Keep-Alive
channel
=
grpc.insecure_channel
(
'localhost:50057'
,
options
=
[
(
'grpc.keepalive_time_ms'
,
10000
)
,
(
'grpc.keepalive_timeout_ms'
,
5000
)
,
(
'grpc.http2.min_time_between_pings_ms'
,
5000
)
,
(
'grpc.keepalive_permit_without_calls'
, True
)
,
]
)
stub
=
hello_pb2_grpc.GreeterStub
(
channel
)
response
=
stub.SayHello
(
hello_pb2.HelloRequest
(
name
=
"小明"
)
,
timeout
=
1.0
单次调用超时 1 秒
)
(
f
"[客户端] 收到: {response.message}"
)
channel.close
(
)
if
name
==
'main'
:
run
(
)
控制台输出 — 服务端:
[
Keep-Alive 示例
]
服务已启动
..
.
[
服务端
]
距离超时还有
0.99
秒
五、gRPC vs REST:何时选择 gRPC?
这个话题是新手最容易纠结的。核心区分点在于
使用场景
-41
-42
:
选择 gRPC 的场景
:
微服务间高并发、低延迟通信
实时数据推送(股票、监控、聊天)
多语言技术栈的服务间调用
需要严格 API 契约管理的大型分布式系统
选择 REST 的场景
:
浏览器直连(无 gRPC-Web 中间件时)
对外的公共 API(兼容性优先)
中小型项目,简单 CRUD 操作
团队缺乏 gRPC 经验的学习过渡期
六、常见问题与避坑指南
6.1
AttributeError: module 'grpc' has no attribute 'server'
错误原因
:把
grpc
和
grpcio
搞混了。Python 里没有叫
grpc
的官方包,你需要
pip install grpcio
,但
import
时写的是
import grpc
——
grpc
是
grpcio
安装后暴露的顶层模块名-。
解决方案
:
pip
install
grpcio
正确安装
import
grpc
正确导入——grpc 是 grpcio 安装后暴露的顶层模块名
6.2 端口冲突
检查端口是否被占用
lsof
-i
:50051
kill
-9
<
PID
6.3 .proto 文件不生效
每次修改
.proto
文件后,记得重新编译:
先删除旧的生成文件
rm
_pb2.py _pb2_grpc.py
再重新编译
python
-m
grpc_tools.protoc -I.
--python_out
=
.
--grpc_python_out
=
. hello.proto
6.4 生产环境使用 TLS
生产环境
务必
使用 TLS 加密通信,替换
add_insecure_port
为:
读取证书
with open
(
'server.key'
,
'rb'
)
as f:
private_key
=
f.read
(
)
with open
(
'server.crt'
,
'rb'
)
as f:
certificate_chain
=
f.read
(
)
创建 SSL 凭证
server_credentials
=
grpc.ssl_server_credentials
(
[
(
private_key, certificate_chain
)
]
)
使用安全端口
server.add_secure_port
(
'[::]:50051'
, server_credentials
)
七、总结与学习路线图
本文核心要点回顾
gRPC 的优势
:HTTP/2 + Protobuf 带来高性能、双向流通信和跨语言支持
开发流程
:定义 .proto → 编译生成代码 → 实现服务端 → 编写客户端
四种通信模式
:一元 RPC、服务端流式、客户端流式、双向流式
生产级实践
:拦截器、错误处理、超时控制、TLS 加密
学习路径建议
入门
:跑通文中的 Hello gRPC 示例(第 2 节),理解协议栈与编译流程
巩固
:实现四种流式模式(第 3 节),掌握
yield
和迭代器模式
进阶
:添加拦截器(日志/认证/限流)、配置健康检查和服务配置
-56
生产
:集成 TLS 加密、实现智能重试策略和负载均衡
-56
深化
:异步 gRPC(
grpc.aio
)、与 FastAPI 混合部署、Service Mesh 集成
延伸阅读
gRPC 官方文档
:
https://grpc.io/docs/languages/python/
— 最权威的参考资料
Protocol Buffers 规范
:
https://protobuf.dev/programming-guides/proto3/
— Proto3 完整语法
gRPC 服务配置指南
:
https://grpc.org.cn/docs/guides/service-config/
— 超时、重试、负载均衡配置
gRPC 的学习曲线比 REST 稍高,但它在性能上的回报是巨大的。掌握 gRPC,你将能够构建出真正高性能的分布式系统。如果你在实践过程中遇到任何问题,欢迎在评论区留言交流!
还可以去公众号、今日头条搜索「IT策士」,一起升级 IT 思维 !
查看原文
🏷 标签: Python, gRPC, Protocol Buffers, 微服务, HTTP/2