Spring Boot RAG文档上传:为何采用分布式信号量控制并发?
原文地址: https://88box.top 生成时间: 2026-05-20 09:40:55
Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发? - hey99 知识搜索引擎
精选文章
Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发?
摘要 Spring Boot实现RAG文档上传时,需要采用分布式信号量控制并发。原因在于文档上传后还需进行文本解析、分片、向量生成等后续处理,这些操作会传递压力到整个系统。虽然文件大小限制能控制单个请求,但无法限制并发请求数。在多实例部署环境下,本地信号量会导致整体并发失控,因此需使用基于Redis的分布式信号量统一控制。实现方案包括:初始化信号量配置、请求时获取许可、超时拒绝及请求完成释放许可,确保无论部署多少实例都能维持可控的并发上传量。
更新于 2026-05-20 00:57
spring
spring boot
java
后端
分布式
Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发?
做 RAG 系统时,文档上传不是简单地把文件收下来。
用户上传一个 PDF、Word 或 Markdown 后,系统后面通常还要做:
保存文件 -> 解析文本 -> 文本分片 -> 生成向量 -> 写入向量库
所以文档上传的压力不只在上传接口本身,还会继续传递到解析、embedding 和向量库。
文件大小限制只能解决单个请求的问题:
spring
:
servlet
:
multipart
:
max-file-size
:
50MB
max-request-size
:
100MB
它能限制“单个文件多大”,但不能限制“同时有多少人在上传”。
比如单个文件都没有超过 50MB,但同时来了 100 个上传请求,系统仍然可能出现:
临时文件变多
磁盘 IO 增高
对象存储写入变慢
解析任务堆积
embedding 请求堆积
向量库写入压力变大
这时就需要并发控制。
为什么是分布式信号量?
如果服务只有一个实例,用本地
Semaphore
就可以限制并发。
但生产环境通常是多实例部署:
实例 A
实例 B
实例 C
如果每个实例本地限制 10 个上传,整体并发就可能变成 30。
所以 RAG 文档上传更适合用分布式信号量,把并发数量放到 Redis 这类共享存储里统一控制。
核心目标只有一个:
不管部署多少个服务实例,同一时间最多只允许 N 个上传请求继续执行。
配置示例
可以把上传并发控制抽成这样的配置:
app
:
semaphore
:
file-upload
:
name
:
app
:
file
:
upload
:
semaphore
max-concurrent
:
10
max-wait-seconds
:
30
lease-seconds
:
30
含义:
name
:信号量名称,多实例使用同一个名称,才能共享同一个并发池。
max-concurrent
:最大上传并发数。
max-wait-seconds
:获取许可的最大等待时间,超过后直接拒绝。
lease-seconds
:许可租约时间,防止服务异常退出后许可一直不释放。
实现思路
整体流程很简单:
启动时初始化信号量
上传请求进来先抢许可
抢到许可才继续处理
抢不到许可返回 429
请求结束后释放许可
启动时初始化:
package
com
.
example
.
project
.
config
;
import
jakarta
.
validation
.
Valid
;
import
jakarta
.
validation
.
constraints
.
Min
;
import
jakarta
.
validation
.
constraints
.
NotBlank
;
import
lombok
.
Data
;
import
org
.
springframework
.
boot
.
context
.
properties
.
ConfigurationProperties
;
import
org
.
springframework
.
context
.
annotation
.
Configuration
;
import
org
.
springframework
.
validation
.
annotation
.
Validated
;
/**
- 通用分布式信号量配置
*/
@Data
@Validated
@Configuration
@ConfigurationProperties
(
prefix
=
"app.semaphore"
)
public
class
GenericSemaphoreProperties
{
@Valid
private
ExpirableSemaphoreConfig
fileUpload
=
new
ExpirableSemaphoreConfig
(
)
;
@Data
public
static
class
ExpirableSemaphoreConfig
{
/**
- Redisson 信号量名称
*/
@NotBlank
private
String
name
=
"app:file:upload:semaphore"
;
/**
- 最大并发数
*/
@Min
(
1
)
private
Integer
maxConcurrent
=
10
;
/**
- 获取许可最大等待时间,单位:秒
*/
@Min
(
0
)
private
Integer
maxWaitSeconds
=
30
;
/**
- permit 自动释放时间,单位:秒
*/
@Min
(
1
)
private
Integer
leaseSeconds
=
30
;
}
}
设置filter 上传请求进入时:
package
com
.
example
.
project
.
web
.
filter
;
import
jakarta
.
servlet
.
FilterChain
;
import
jakarta
.
servlet
.
ServletException
;
import
jakarta
.
servlet
.
http
.
HttpServletRequest
;
import
jakarta
.
servlet
.
http
.
HttpServletResponse
;
import
lombok
.
RequiredArgsConstructor
;
import
lombok
.
extern
.
slf4j
.
Slf4j
;
import
org
.
jetbrains
.
annotations
.
NotNull
;
import
org
.
redisson
.
api
.
RPermitExpirableSemaphore
;
import
org
.
redisson
.
api
.
RedissonClient
;
import
org
.
springframework
.
core
.
Ordered
;
import
org
.
springframework
.
core
.
annotation
.
Order
;
import
org
.
springframework
.
stereotype
.
Component
;
import
org
.
springframework
.
web
.
filter
.
OncePerRequestFilter
;
import
java
.
io
.
IOException
;
import
java
.
util
.
concurrent
.
TimeUnit
;
/**
- 文件上传并发控制过滤器
*
- 用于限制指定上传接口的同时处理数量。
*/
@Slf4j
@Component
@Order
(
Ordered
.
HIGHEST_PRECEDENCE
)
@RequiredArgsConstructor
public
class
GenericUploadConcurrencyFilter
extends
OncePerRequestFilter
{
private
final
RedissonClient
redissonClient
;
private
final
GenericSemaphoreProperties
semaphoreProperties
;
private
static
final
String
UPLOAD_PATH_KEYWORD
=
"/resource/"
;
private
static
final
String
UPLOAD_PATH_SUFFIX
=
"/files/upload"
;
@Override
protected
void
doFilterInternal
(
@NotNull
HttpServletRequest
request
,
@NotNull
HttpServletResponse
response
,
@NotNull
FilterChain
filterChain
)
throws
ServletException
,
IOException
{
if
(
!
isTargetUploadRequest
(
request
)
)
{
filterChain
.
doFilter
(
request
,
response
)
;
return
;
}
GenericSemaphoreProperties
.
ExpirableSemaphoreConfig
config
=
semaphoreProperties
.
getFileUpload
(
)
;
RPermitExpirableSemaphore
semaphore
=
redissonClient
.
getPermitExpirableSemaphore
(
config
.
getName
(
)
)
;
String
permitId
=
null
;
try
{
permitId
=
semaphore
.
tryAcquire
(
config
.
getMaxWaitSeconds
(
)
,
config
.
getLeaseSeconds
(
)
,
TimeUnit
.
SECONDS
)
;
if
(
permitId
==
null
)
{
writeTooManyRequestsResponse
(
response
)
;
return
;
}
filterChain
.
doFilter
(
request
,
response
)
;
}
catch
(
InterruptedException
ex
)
{
Thread
.
currentThread
(
)
.
interrupt
(
)
;
writeServerErrorResponse
(
response
)
;
}
finally
{
releasePermitIfNecessary
(
semaphore
,
permitId
)
;
}
}
/**
- 判断是否为目标上传请求
*/
private
boolean
isTargetUploadRequest
(
HttpServletRequest
request
)
{
if
(
!
"POST"
.
equalsIgnoreCase
(
request
.
getMethod
(
)
)
)
{
return
false
;
}
String
requestUri
=
request
.
getRequestURI
(
)
;
return
requestUri
!=
null
&&
requestUri
.
contains
(
UPLOAD_PATH_KEYWORD
)
&&
requestUri
.
endsWith
(
UPLOAD_PATH_SUFFIX
)
;
}
/**
- 返回请求过多响应
*/
private
void
writeTooManyRequestsResponse
(
HttpServletResponse
response
)
throws
IOException
{
response
.
setStatus
(
HttpServletResponse
.
SC_TOO_MANY_REQUESTS
)
;
response
.
setContentType
(
"application/json;charset=UTF-8"
)
;
response
.
getWriter
(
)
.
write
(
"{\"code\":\"429\",\"message\":\"当前上传请求过多,请稍后再试\"}"
)
;
}
/**
- 返回服务异常响应
*/
private
void
writeServerErrorResponse
(
HttpServletResponse
response
)
throws
IOException
{
response
.
setStatus
(
HttpServletResponse
.
SC_INTERNAL_SERVER_ERROR
)
;
response
.
setContentType
(
"application/json;charset=UTF-8"
)
;
response
.
getWriter
(
)
.
write
(
"{\"code\":\"500\",\"message\":\"获取上传许可失败\"}"
)
;
}
/**
- 释放信号量许可
*/
private
void
releasePermitIfNecessary
(
RPermitExpirableSemaphore
semaphore
,
String
permitId
)
{
if
(
permitId
==
null
)
{
return
;
}
boolean
released
=
semaphore
.
tryRelease
(
permitId
)
;
if
(
!
released
)
{
log
.
warn
(
"Upload permit has already expired or been released, permitId={}"
,
permitId
)
;
}
}
}
注意,释放许可一定要放在
finally
里。上传可能成功,也可能失败,但只要拿到了许可,请求结束时都要释放。
总结
RAG 文档上传要同时控制两个维度:
文件大小:防止单个请求过大
上传并发:防止太多请求同时进入
文件大小限制靠 context-length
上传并发控制可以用分布式信号量。
比较稳的做法是:
上传入口限制文件大小
上传请求先抢分布式许可
抢不到许可返回 429
抢到许可后再进入后续的业务处理
请求结束释放许可
后续解析、embedding、向量库写入也要单独限流
这样即使服务多实例部署,文档上传入口的总体并发也能保持在可控范围内。
查看原文
🏷 标签: Spring Boot, RAG, 分布式并发控制, Redis, 信号量