跳转至

Python gRPC 从入门到进阶:构建高效微服务通信的全面教程

原文地址: https://88box.top 生成时间: 2026-05-20 01:01:41


Python gRPC 从入门到进阶:高效微服务通信的完整指南 - hey99 知识搜索引擎

精选文章

Python gRPC 从入门到进阶:高效微服务通信的完整指南

摘要 本文介绍了gRPC在微服务架构中的应用,重点讲解了Python环境下gRPC的开发实践。主要内容包括: 环境搭建:安装gRPC核心库和编译工具,验证版本信息。 入门实战:通过定义.proto文件、编译生成Python代码、实现服务端和客户端,构建了一个简单的问候服务示例。 通信模式:详细解析了gRPC支持的四种通信模式,包括一元RPC、服务端流式、客户端流式和双向流式。 文章提供了可直接运行的代码示例,适合新手入门和进阶开发者参考,帮助读者掌握gRPC的核心概念和实际应用技巧。

更新于 2026-05-19 16:47

作者:IT策士

10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在公众号、今日头条持续发布最新文章,助你少走弯路。

前言

在微服务架构中,服务间的通信效率直接影响着整个系统的性能。想象一下,如果你的电商订单系统每秒需要调用上千次库存服务,那种传统的 JSON 文本传输会带来多大的性能损耗?这正是 gRPC 要解决的核心问题。

gRPC 由 Google 开发,是一个高性能、开源的远程过程调用框架。与传统的 REST API 不同,gRPC 基于 HTTP/2 协议,使用 Protocol Buffers 作为序列化格式,将数据体积压缩 3-10 倍,同时支持双向流通信、连接多路复用等高级特性

-42

。Netflix、Square、Google 等顶级公司已大规模用 gRPC 替代 REST,用于内部微服务通信

-42

本文将带你从零开始掌握 Python 中的 gRPC 开发技术,从环境搭建到生产级实践,每一个示例都可以直接运行,适合新手入门和进阶开发者参考。

一、环境准备:从零搭建 gRPC 开发环境

首先安装核心依赖:

安装 gRPC 核心库和编译工具

pip

install

grpcio grpcio-tools protobuf

安装完成后,验证版本:

python

-c

"import grpc; print(f'gRPC 版本:{grpc.version}')"

控制台输出:

新手常见坑

import grpc

时不能写成

import grpcio

。Python 中没有叫

grpc

的官方包,

grpcio

安装后暴露的顶层模块名是

grpc

,这个细节经常让新手困惑-。

二、入门实战:构建第一个 gRPC 服务

2.1 定义 .proto 文件

gRPC 的核心思想是

契约优先

——先定义好服务接口,再由框架自动生成代码。接口定义使用 Protocol Buffers 语法。

// hello.proto

syntax

=

"proto3"

;

package hello

;

// 定义问候服务

service

Greeter

{

// 一元 RPC:客户端发送一个请求,服务端返回一个响应

rpc SayHello

(

HelloRequest

)

returns

(

HelloReply

)

;

}

// 请求消息

message HelloRequest

{

string name

=

1

;

}

// 响应消息

message HelloReply

{

string message

=

1

;

}

语法要点解析

syntax = "proto3"

:使用 Proto3 语法,这是目前推荐的标准版本

-38

message

:定义数据结构,每个字段后有唯一的编号(tag),用于二进制编码时识别字段

-38

service

:定义 RPC 方法签名,指定输入输出类型

-39

字段编号从 1 开始递增,

不可重复

,建议预留部分编号便于后续扩展

2.2 编译生成 Python 代码

使用

protoc

编译器将

.proto

文件编译为 Python 代码:

python

-m

grpc_tools.protoc -I.

--python_out

=

.

--grpc_python_out

=

. hello.proto

控制台输出

(成功时无报错):

这个命令会在当前目录下生成两个文件

-7

hello_pb2.py

:包含消息类的序列化/反序列化代码

hello_pb2_grpc.py

:包含服务基类(Servicer)和客户端桩代码(Stub)

组件分工表

2.3 实现服务端

服务端的核心任务是

实现 .proto 中定义的服务接口并启动 gRPC 服务器

-7

server.py

import

grpc

from concurrent

import

futures

import

hello_pb2

import

hello_pb2_grpc

class GreeterServicer

(

hello_pb2_grpc.GreeterServicer

)

:

""

"实现 .proto 中定义的 Greeter 服务

""

"

def SayHello

(

self, request, context

)

:

request.name 来自客户端发送的 HelloRequest

print

(

f

"[服务端] 收到请求,name={request.name}"

)

return

hello_pb2.HelloReply

(

message

=

f

"你好,{request.name}!这里是 gRPC 服务端 🎉"

)

def serve

(

)

:

创建 gRPC 服务器,最大工作线程数 10

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

))

将服务实现注册到服务器

hello_pb2_grpc.add_GreeterServicer_to_server

(

GreeterServicer

(

)

, server

)

监听端口(insecure 表示不使用 TLS,仅供开发测试)

server.add_insecure_port

(

'[::]:50051'

)

print

(

"[服务端] gRPC 服务已启动,监听端口 50051..."

)

server.start

(

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

2.4 实现客户端

客户端的任务是

通过 Stub(桩)调用远程服务方法

,就像调用本地函数一样自然

-7

client.py

import

grpc

import

hello_pb2

import

hello_pb2_grpc

def run

(

)

:

建立与服务器的连接(insecure 开发模式)

with grpc.insecure_channel

(

'localhost:50051'

)

as channel:

创建客户端桩(Stub)

stub

=

hello_pb2_grpc.GreeterStub

(

channel

)

构造请求并调用远程方法

response

=

stub.SayHello

(

hello_pb2.HelloRequest

(

name

=

"小明"

))

print

(

f

"[客户端] 收到响应: {response.message}"

)

if

name

==

'main'

:

run

(

)

2.5 运行测试

打开

两个终端

,分别运行服务端和客户端。

终端 1 — 服务端:

终端 1 输出:

[

服务端

]

gRPC 服务已启动,监听端口

50051

..

.

[

服务端

]

收到请求,name

=

小明

终端 2 — 客户端:

终端 2 输出:

[

客户端

]

收到响应: 你好,小明!这里是 gRPC 服务端 🎉

恭喜!你已经成功构建了第一个 gRPC 服务 🎉。

三、深入理解:四种通信模式全解析

gRPC 基于 HTTP/2,支持四种通信模式

-16

。下面逐一实战演示。

3.1 定义流式 .proto 文件

// streaming.proto

syntax

=

"proto3"

;

package streaming

;

service

DataService

{

//

1

. 一元 RPC:一问一答(上面已讲)

rpc UnaryCall

(

DataRequest

)

returns

(

DataResponse

)

;

//

2

. 服务端流式:客户端发一次,服务端持续推送

rpc ServerStreaming

(

DataRequest

)

returns

(

stream DataResponse

)

;

//

3

. 客户端流式:客户端持续发送,服务端汇总一次

rpc ClientStreaming

(

stream DataRequest

)

returns

(

DataResponse

)

;

//

4

. 双向流式:双方都可随时发送

rpc BidirectionalStreaming

(

stream DataRequest

)

returns

(

stream DataResponse

)

;

}

message DataRequest

{

string message

=

1

;

int32 sequence

=

2

;

// 序号,用于追踪数据流顺序

}

message DataResponse

{

string message

=

1

;

int32 sequence

=

2

;

}

语法要点

stream

关键字是流式模式的核心声明。出现在返回类型前表示服务端流式,出现在参数类型前表示客户端流式,两边都有则是双向流式

-19

首先编译生成代码:

python

-m

grpc_tools.protoc -I.

--python_out

=

.

--grpc_python_out

=

. streaming.proto

3.2 模式一:服务端流式(Server Streaming)

适用场景

:股票实时行情推送、日志监控、大文件下载。服务端逐个发送数据块,客户端逐个读取,内存占用极低

-19

server_streaming_server.py

import

grpc

import

time

from concurrent

import

futures

import

streaming_pb2

import

streaming_pb2_grpc

class DataServiceServicer

(

streaming_pb2_grpc.DataServiceServicer

)

:

def ServerStreaming

(

self, request, context

)

:

""

"服务端流式:收到一个请求后,持续推送多条数据

""

"

print

(

f

"[服务端] 收到流式请求: {request.message},开始推送数据..."

)

for

i

in

range

(

5

)

:

response

=

streaming_pb2.DataResponse

(

message

=

f

"服务端推送第 {i+1} 条数据"

,

sequence

=

i +

1

)

print

(

f

"[服务端] 推送 #{i+1}"

)

yield response

time.sleep

(

0.5

)

模拟数据生成延迟

print

(

"[服务端] 推送完成"

)

def serve

(

)

:

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

))

streaming_pb2_grpc.add_DataServiceServicer_to_server

(

DataServiceServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50052'

)

server.start

(

)

print

(

"[服务端流式] 服务已启动,监听端口 50052..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

server_streaming_client.py

import

grpc

import

streaming_pb2

import

streaming_pb2_grpc

def run

(

)

:

with grpc.insecure_channel

(

'localhost:50052'

)

as channel:

stub

=

streaming_pb2_grpc.DataServiceStub

(

channel

)

request

=

streaming_pb2.DataRequest

(

message

=

"请推送数据"

)

print

(

"[客户端] 发送请求,等待服务端推送..."

)

responses 是一个迭代器,逐个接收服务端推送的数据

responses

=

stub.ServerStreaming

(

request

)

for

resp

in

responses:

print

(

f

"[客户端] 收到 #{resp.sequence}: {resp.message}"

)

if

name

==

'main'

:

run

(

)

控制台输出 — 服务端:

[

服务端流式

]

服务已启动,监听端口

50052

..

.

[

服务端

]

收到流式请求: 请推送数据,开始推送数据

..

.

[

服务端

]

推送

1

[

服务端

]

推送

2

[

服务端

]

推送

3

[

服务端

]

推送

4

[

服务端

]

推送

5

[

服务端

]

推送完成

控制台输出 — 客户端:

[

客户端

]

发送请求,等待服务端推送

..

.

[

客户端

]

收到

1: 服务端推送第 1 条数据

[

客户端

]

收到

2: 服务端推送第 2 条数据

[

客户端

]

收到

3: 服务端推送第 3 条数据

[

客户端

]

收到

4: 服务端推送第 4 条数据

[

客户端

]

收到

5: 服务端推送第 5 条数据

核心要点

:服务端使用

yield

逐个生成响应,客户端通过

for-in

循环逐条接收,无需等待全部数据到达。

3.3 模式二:客户端流式(Client Streaming)

适用场景

:物联网设备批量上报传感器数据、日志采集、大文件上传

-19

client_streaming_server.py

import

grpc

from concurrent

import

futures

import

streaming_pb2

import

streaming_pb2_grpc

class DataServiceServicer

(

streaming_pb2_grpc.DataServiceServicer

)

:

def ClientStreaming

(

self, request_iterator, context

)

:

""

"客户端流式:接收客户端持续发送的多条数据,汇总后返回一条结果

""

"

total

=

0

messages

=

[

]

for

request

in

request_iterator:

total

+=

1

messages.append

(

request.message

)

print

(

f

"[服务端] 收到 #{request.sequence}: {request.message}"

)

汇总返回

result

=

streaming_pb2.DataResponse

(

message

=

f

"共收到 {total} 条数据: {'; '.join(messages)}"

,

sequence

=

total

)

print

(

f

"[服务端] 汇总完成,返回结果"

)

return

result

def serve

(

)

:

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

))

streaming_pb2_grpc.add_DataServiceServicer_to_server

(

DataServiceServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50053'

)

server.start

(

)

print

(

"[客户端流式] 服务已启动,监听端口 50053..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

client_streaming_client.py

import

grpc

import

time

import

streaming_pb2

import

streaming_pb2_grpc

def generate_requests

(

)

:

""

"生成器函数:逐个产生要发送的请求

""

"

data_list

=

[

"传感器-温度:25.3°C"

,

"传感器-湿度:68%"

,

"传感器-气压:1013hPa"

]

for

i, data

in

enumerate

(

data_list

)

:

yield streaming_pb2.DataRequest

(

message

=

data,

sequence

=

i +

1

)

time.sleep

(

0.3

)

模拟采集间隔

print

(

f

" [生成器] 已产生第 {i+1} 条数据"

)

def run

(

)

:

with grpc.insecure_channel

(

'localhost:50053'

)

as channel:

stub

=

streaming_pb2_grpc.DataServiceStub

(

channel

)

response

=

stub.ClientStreaming

(

generate_requests

(

))

print

(

f

"

\n

[客户端] 收到汇总响应: {response.message}"

)

if

name

==

'main'

:

run

(

)

控制台输出 — 服务端:

[

客户端流式

]

服务已启动,监听端口

50053

..

.

[

服务端

]

收到

1: 传感器-温度:25.3°C

[

服务端

]

收到

2: 传感器-湿度:68%

[

服务端

]

收到

3: 传感器-气压:1013hPa

[

服务端

]

汇总完成,返回结果

控制台输出 — 客户端:

[

生成器

]

已产生第

1

条数据

[

生成器

]

已产生第

2

条数据

[

生成器

]

已产生第

3

条数据

[

客户端

]

收到汇总响应: 共收到

3

条数据: 传感器-温度:25.3°C

;

传感器-湿度:68%

;

传感器-气压:1013hPa

核心要点

:客户端用

yield

生成器逐个发送请求,服务端通过迭代

request_iterator

逐个接收,汇总后一次性返回结果。

3.4 模式三:双向流式(Bidirectional Streaming)

适用场景

:实时聊天、语音识别、在线游戏同步——双方可以随时、独立地发送数据

-19

bidirectional_server.py

import

grpc

import

time

from concurrent

import

futures

import

streaming_pb2

import

streaming_pb2_grpc

class DataServiceServicer

(

streaming_pb2_grpc.DataServiceServicer

)

:

def BidirectionalStreaming

(

self, request_iterator, context

)

:

""

"双向流式:一边接收客户端消息,一边主动推送响应

""

"

print

(

"[服务端] 双向流通信已建立,等待客户端消息..."

)

for

request

in

request_iterator:

print

(

f

"[服务端] 收到 #{request.sequence}: {request.message}"

)

收到一条消息后,立即回复两条

yield streaming_pb2.DataResponse

(

message

=

f

"回声: {request.message}"

,

sequence

=

request.sequence *

100

+

1

)

yield streaming_pb2.DataResponse

(

message

=

f

"确认收到第 {request.sequence} 条消息"

,

sequence

=

request.sequence *

100

+

2

)

def serve

(

)

:

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

))

streaming_pb2_grpc.add_DataServiceServicer_to_server

(

DataServiceServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50054'

)

server.start

(

)

print

(

"[双向流式] 服务已启动,监听端口 50054..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

bidirectional_client.py

import

grpc

import

time

import

threading

import

streaming_pb2

import

streaming_pb2_grpc

def run

(

)

:

with grpc.insecure_channel

(

'localhost:50054'

)

as channel:

stub

=

streaming_pb2_grpc.DataServiceStub

(

channel

)

def generate_messages

(

)

:

""

"在另一个线程中发送消息

""

"

messages

=

[

"你好服务端!"

,

"这是第二条消息"

,

"再见!"

]

for

i, msg

in

enumerate

(

messages

)

:

yield streaming_pb2.DataRequest

(

message

=

msg,

sequence

=

i +

1

)

time.sleep

(

1

)

print

(

f

" [发送线程] 已发送第 {i+1} 条: {msg}"

)

调用双向流 RPC

responses

=

stub.BidirectionalStreaming

(

generate_messages

(

))

print

(

"[客户端] 开始接收服务端响应..."

)

for

resp

in

responses:

print

(

f

"[客户端] 收到 #{resp.sequence}: {resp.message}"

)

if

name

==

'main'

:

run

(

)

控制台输出 — 服务端:

[

双向流式

]

服务已启动,监听端口

50054

..

.

[

服务端

]

双向流通信已建立,等待客户端消息

..

.

[

服务端

]

收到

1: 你好服务端!

[

服务端

]

收到

2: 这是第二条消息

[

服务端

]

收到

3: 再见!

控制台输出 — 客户端:

[

客户端

]

开始接收服务端响应

..

.

[

客户端

]

收到

101: 回声: 你好服务端!

[

客户端

]

收到

102: 确认收到第 1 条消息

[

发送线程

]

已发送第

1

条: 你好服务端!

[

客户端

]

收到

201: 回声: 这是第二条消息

[

客户端

]

收到

202: 确认收到第 2 条消息

[

发送线程

]

已发送第

2

条: 这是第二条消息

[

客户端

]

收到

301: 回声: 再见!

[

客户端

]

收到

302: 确认收到第 3 条消息

[

发送线程

]

已发送第

3

条: 再见!

核心要点

:双向流式模式下,客户端和服务端可以

同时

发送和接收数据,实现了真正的全双工通信。收到一条消息后,可以回复多条(本例中回复了 2 条)。

3.5 四种模式对比总结

四、进阶应用:生产级 gRPC 服务的关键技术

前面已经掌握了 gRPC 的基本用法,但要构建生产级服务,还需要掌握以下几个关键技术。

4.1 拦截器:横切逻辑的优雅实现

拦截器是 gRPC 的"中间件"机制,可以在不修改业务代码的情况下,统一处理日志、认证、监控等横切关注点-。

interceptor_server.py

import

grpc

from concurrent

import

futures

import

time

import

hello_pb2

import

hello_pb2_grpc

class LoggingInterceptor

(

grpc.ServerInterceptor

)

:

""

"自定义日志拦截器:记录每次 RPC 调用的耗时

""

"

def intercept_service

(

self, continuation, handler_call_details

)

:

start_time

=

time.time

(

)

调用实际的 RPC 处理方法

response

=

continuation

(

handler_call_details

)

elapsed

=

(

time.time

(

)

  • start_time

)

*

1000

method

=

handler_call_details.method

print

(

f

"[拦截器] {method} 调用完成,耗时 {elapsed:.2f}ms"

)

return

response

class GreeterServicer

(

hello_pb2_grpc.GreeterServicer

)

:

def SayHello

(

self, request, context

)

:

return

hello_pb2.HelloReply

(

message

=

f

"你好,{request.name}!"

)

def serve

(

)

:

在创建服务器时传入拦截器

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

)

,

interceptors

=

[

LoggingInterceptor

(

)

]

注入拦截器

)

hello_pb2_grpc.add_GreeterServicer_to_server

(

GreeterServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50055'

)

server.start

(

)

print

(

"[拦截器示例] 服务已启动..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

客户端(使用之前的

client.py

修改端口为 50055):

interceptor_client.py

import

grpc

import

hello_pb2

import

hello_pb2_grpc

def run

(

)

:

with grpc.insecure_channel

(

'localhost:50055'

)

as channel:

stub

=

hello_pb2_grpc.GreeterStub

(

channel

)

response

=

stub.SayHello

(

hello_pb2.HelloRequest

(

name

=

"小明"

))

print

(

f

"[客户端] 收到: {response.message}"

)

if

name

==

'main'

:

run

(

)

控制台输出 — 服务端:

[

拦截器示例

]

服务已启动

..

.

[

拦截器

]

/hello.Greeter/SayHello 调用完成,耗时

0

.03ms

4.2 错误处理与状态码

gRPC 提供了丰富的状态码体系,远比 HTTP 状态码更精确

-58

。不应一律返回

INTERNAL

错误,而应使用语义明确的状态码。

error_handling_server.py

import

grpc

from concurrent

import

futures

import

hello_pb2

import

hello_pb2_grpc

class GreeterServicer

(

hello_pb2_grpc.GreeterServicer

)

:

def SayHello

(

self, request, context

)

:

参数校验

if

not request.name:

context.set_code

(

grpc.StatusCode.INVALID_ARGUMENT

)

context.set_details

(

'name 参数不能为空'

)

return

hello_pb2.HelloReply

(

)

模拟权限检查

if

request.name

==

"admin"

:

context.set_code

(

grpc.StatusCode.PERMISSION_DENIED

)

context.set_details

(

'不允许使用 admin 身份'

)

return

hello_pb2.HelloReply

(

)

模拟资源不存在

if

request.name

==

"notfound"

:

context.abort

(

grpc.StatusCode.NOT_FOUND,

'该用户不存在'

)

return

hello_pb2.HelloReply

(

message

=

f

"你好,{request.name}!调用成功"

)

def serve

(

)

:

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

))

hello_pb2_grpc.add_GreeterServicer_to_server

(

GreeterServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50056'

)

server.start

(

)

print

(

"[错误处理示例] 服务已启动..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

error_handling_client.py

import

grpc

import

hello_pb2

import

hello_pb2_grpc

def call_with_name

(

stub, name

)

:

""

"封装调用逻辑,统一处理 RPC 错误

""

"

try:

response

=

stub.SayHello

(

hello_pb2.HelloRequest

(

name

=

name

)

,

timeout

=

2.0

设置 2 秒超时

)

print

(

f

"[客户端] name='{name}' -> {response.message}"

)

except grpc.RpcError as e:

print

(

f

"[客户端] name='{name}' -> "

f

"状态码: {e.code()}, 详情: {e.details()}"

)

def run

(

)

:

with grpc.insecure_channel

(

'localhost:50056'

)

as channel:

stub

=

hello_pb2_grpc.GreeterStub

(

channel

)

call_with_name

(

stub,

"小明"

)

正常

call_with_name

(

stub,

""

)

参数为空

call_with_name

(

stub,

"admin"

)

权限不足

call_with_name

(

stub,

"notfound"

)

资源不存在

if

name

==

'main'

:

run

(

)

控制台输出 — 客户端:

[

客户端

]

name

=

'小明'

-

你好,小明!调用成功

[

客户端

]

name

=

''

-

状态码: StatusCode.INVALID_ARGUMENT, 详情: name 参数不能为空

[

客户端

]

name

=

'admin'

-

状态码: StatusCode.PERMISSION_DENIED, 详情: 不允许使用 admin 身份

[

客户端

]

name

=

'notfound'

-

状态码: StatusCode.NOT_FOUND, 详情: 该用户不存在

常用状态码速查

4.3 HTTP/2 连接复用(Keep-Alive)与超时配置

生产环境中,长连接的管理直接影响资源利用率和故障恢复速度。

keepalive_server.py

import

grpc

from concurrent

import

futures

import

hello_pb2

import

hello_pb2_grpc

class GreeterServicer

(

hello_pb2_grpc.GreeterServicer

)

:

def SayHello

(

self, request, context

)

:

获取请求超时时间

deadline

=

context.time_remaining

(

)

if

deadline:

print

(

f

"[服务端] 距离超时还有 {deadline:.2f} 秒"

)

return

hello_pb2.HelloReply

(

message

=

f

"你好,{request.name}!"

)

def serve

(

)

:

server

=

grpc.server

(

futures.ThreadPoolExecutor

(

max_workers

=

10

)

,

options

=

[

HTTP/2 Keep-Alive:定期发送 PING 帧保持连接

(

'grpc.keepalive_time_ms'

,

10000

)

,

每 10 秒发送 PING

(

'grpc.keepalive_timeout_ms'

,

5000

)

,

PING 超时 5 秒则断开

(

'grpc.keepalive_permit_without_calls'

, True

)

,

(

'grpc.http2.max_pings_without_data'

,

0

)

,

]

)

hello_pb2_grpc.add_GreeterServicer_to_server

(

GreeterServicer

(

)

, server

)

server.add_insecure_port

(

'[::]:50057'

)

server.start

(

)

print

(

"[Keep-Alive 示例] 服务已启动..."

)

server.wait_for_termination

(

)

if

name

==

'main'

:

serve

(

)

keepalive_client.py

import

grpc

import

hello_pb2

import

hello_pb2_grpc

def run

(

)

:

配置客户端侧 HTTP/2 Keep-Alive

channel

=

grpc.insecure_channel

(

'localhost:50057'

,

options

=

[

(

'grpc.keepalive_time_ms'

,

10000

)

,

(

'grpc.keepalive_timeout_ms'

,

5000

)

,

(

'grpc.http2.min_time_between_pings_ms'

,

5000

)

,

(

'grpc.keepalive_permit_without_calls'

, True

)

,

]

)

stub

=

hello_pb2_grpc.GreeterStub

(

channel

)

response

=

stub.SayHello

(

hello_pb2.HelloRequest

(

name

=

"小明"

)

,

timeout

=

1.0

单次调用超时 1 秒

)

print

(

f

"[客户端] 收到: {response.message}"

)

channel.close

(

)

if

name

==

'main'

:

run

(

)

控制台输出 — 服务端:

[

Keep-Alive 示例

]

服务已启动

..

.

[

服务端

]

距离超时还有

0.99

五、gRPC vs REST:何时选择 gRPC?

这个话题是新手最容易纠结的。核心区分点在于

使用场景

-41

-42

选择 gRPC 的场景

微服务间高并发、低延迟通信

实时数据推送(股票、监控、聊天)

多语言技术栈的服务间调用

需要严格 API 契约管理的大型分布式系统

选择 REST 的场景

浏览器直连(无 gRPC-Web 中间件时)

对外的公共 API(兼容性优先)

中小型项目,简单 CRUD 操作

团队缺乏 gRPC 经验的学习过渡期

六、常见问题与避坑指南

6.1

AttributeError: module 'grpc' has no attribute 'server'

错误原因

:把

grpc

grpcio

搞混了。Python 里没有叫

grpc

的官方包,你需要

pip install grpcio

,但

import

时写的是

import grpc

——

grpc

grpcio

安装后暴露的顶层模块名-。

解决方案

pip

install

grpcio

正确安装

import

grpc

正确导入——grpc 是 grpcio 安装后暴露的顶层模块名

6.2 端口冲突

检查端口是否被占用

lsof

-i

:50051

kill

-9

<

PID

6.3 .proto 文件不生效

每次修改

.proto

文件后,记得重新编译:

先删除旧的生成文件

rm

_pb2.py _pb2_grpc.py

再重新编译

python

-m

grpc_tools.protoc -I.

--python_out

=

.

--grpc_python_out

=

. hello.proto

6.4 生产环境使用 TLS

生产环境

务必

使用 TLS 加密通信,替换

add_insecure_port

为:

读取证书

with open

(

'server.key'

,

'rb'

)

as f:

private_key

=

f.read

(

)

with open

(

'server.crt'

,

'rb'

)

as f:

certificate_chain

=

f.read

(

)

创建 SSL 凭证

server_credentials

=

grpc.ssl_server_credentials

(

[

(

private_key, certificate_chain

)

]

)

使用安全端口

server.add_secure_port

(

'[::]:50051'

, server_credentials

)

七、总结与学习路线图

本文核心要点回顾

gRPC 的优势

:HTTP/2 + Protobuf 带来高性能、双向流通信和跨语言支持

开发流程

:定义 .proto → 编译生成代码 → 实现服务端 → 编写客户端

四种通信模式

:一元 RPC、服务端流式、客户端流式、双向流式

生产级实践

:拦截器、错误处理、超时控制、TLS 加密

学习路径建议

入门

:跑通文中的 Hello gRPC 示例(第 2 节),理解协议栈与编译流程

巩固

:实现四种流式模式(第 3 节),掌握

yield

和迭代器模式

进阶

:添加拦截器(日志/认证/限流)、配置健康检查和服务配置

-56

生产

:集成 TLS 加密、实现智能重试策略和负载均衡

-56

深化

:异步 gRPC(

grpc.aio

)、与 FastAPI 混合部署、Service Mesh 集成

延伸阅读

gRPC 官方文档

https://grpc.io/docs/languages/python/

— 最权威的参考资料

Protocol Buffers 规范

https://protobuf.dev/programming-guides/proto3/

— Proto3 完整语法

gRPC 服务配置指南

https://grpc.org.cn/docs/guides/service-config/

— 超时、重试、负载均衡配置

gRPC 的学习曲线比 REST 稍高,但它在性能上的回报是巨大的。掌握 gRPC,你将能够构建出真正高性能的分布式系统。如果你在实践过程中遇到任何问题,欢迎在评论区留言交流!

还可以去公众号、今日头条搜索「IT策士」,一起升级 IT 思维 !

查看原文


🏷 标签: Python, gRPC, Protocol Buffers, 微服务, HTTP/2