跳转至

Spring Boot RAG文档上传:为何采用分布式信号量控制并发?

原文地址: https://88box.top 生成时间: 2026-05-20 09:40:55


Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发? - hey99 知识搜索引擎

精选文章

Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发?

摘要 Spring Boot实现RAG文档上传时,需要采用分布式信号量控制并发。原因在于文档上传后还需进行文本解析、分片、向量生成等后续处理,这些操作会传递压力到整个系统。虽然文件大小限制能控制单个请求,但无法限制并发请求数。在多实例部署环境下,本地信号量会导致整体并发失控,因此需使用基于Redis的分布式信号量统一控制。实现方案包括:初始化信号量配置、请求时获取许可、超时拒绝及请求完成释放许可,确保无论部署多少实例都能维持可控的并发上传量。

更新于 2026-05-20 00:57

spring

spring boot

java

后端

分布式

Spring Boot 做 RAG 文档上传:为什么要用分布式信号量控制并发?

做 RAG 系统时,文档上传不是简单地把文件收下来。

用户上传一个 PDF、Word 或 Markdown 后,系统后面通常还要做:

保存文件 -> 解析文本 -> 文本分片 -> 生成向量 -> 写入向量库

所以文档上传的压力不只在上传接口本身,还会继续传递到解析、embedding 和向量库。

文件大小限制只能解决单个请求的问题:

spring

:

servlet

:

multipart

:

max-file-size

:

50MB

max-request-size

:

100MB

它能限制“单个文件多大”,但不能限制“同时有多少人在上传”。

比如单个文件都没有超过 50MB,但同时来了 100 个上传请求,系统仍然可能出现:

临时文件变多

磁盘 IO 增高

对象存储写入变慢

解析任务堆积

embedding 请求堆积

向量库写入压力变大

这时就需要并发控制。

为什么是分布式信号量?

如果服务只有一个实例,用本地

Semaphore

就可以限制并发。

但生产环境通常是多实例部署:

实例 A

实例 B

实例 C

如果每个实例本地限制 10 个上传,整体并发就可能变成 30。

所以 RAG 文档上传更适合用分布式信号量,把并发数量放到 Redis 这类共享存储里统一控制。

核心目标只有一个:

不管部署多少个服务实例,同一时间最多只允许 N 个上传请求继续执行。

配置示例

可以把上传并发控制抽成这样的配置:

app

:

semaphore

:

file-upload

:

name

:

app

:

file

:

upload

:

semaphore

max-concurrent

:

10

max-wait-seconds

:

30

lease-seconds

:

30

含义:

name

:信号量名称,多实例使用同一个名称,才能共享同一个并发池。

max-concurrent

:最大上传并发数。

max-wait-seconds

:获取许可的最大等待时间,超过后直接拒绝。

lease-seconds

:许可租约时间,防止服务异常退出后许可一直不释放。

实现思路

整体流程很简单:

启动时初始化信号量

上传请求进来先抢许可

抢到许可才继续处理

抢不到许可返回 429

请求结束后释放许可

启动时初始化:

package

com

.

example

.

project

.

config

;

import

jakarta

.

validation

.

Valid

;

import

jakarta

.

validation

.

constraints

.

Min

;

import

jakarta

.

validation

.

constraints

.

NotBlank

;

import

lombok

.

Data

;

import

org

.

springframework

.

boot

.

context

.

properties

.

ConfigurationProperties

;

import

org

.

springframework

.

context

.

annotation

.

Configuration

;

import

org

.

springframework

.

validation

.

annotation

.

Validated

;

/**

  • 通用分布式信号量配置

*/

@Data

@Validated

@Configuration

@ConfigurationProperties

(

prefix

=

"app.semaphore"

)

public

class

GenericSemaphoreProperties

{

@Valid

private

ExpirableSemaphoreConfig

fileUpload

=

new

ExpirableSemaphoreConfig

(

)

;

@Data

public

static

class

ExpirableSemaphoreConfig

{

/**

  • Redisson 信号量名称

*/

@NotBlank

private

String

name

=

"app:file:upload:semaphore"

;

/**

  • 最大并发数

*/

@Min

(

1

)

private

Integer

maxConcurrent

=

10

;

/**

  • 获取许可最大等待时间,单位:秒

*/

@Min

(

0

)

private

Integer

maxWaitSeconds

=

30

;

/**

  • permit 自动释放时间,单位:秒

*/

@Min

(

1

)

private

Integer

leaseSeconds

=

30

;

}

}

设置filter 上传请求进入时:

package

com

.

example

.

project

.

web

.

filter

;

import

jakarta

.

servlet

.

FilterChain

;

import

jakarta

.

servlet

.

ServletException

;

import

jakarta

.

servlet

.

http

.

HttpServletRequest

;

import

jakarta

.

servlet

.

http

.

HttpServletResponse

;

import

lombok

.

RequiredArgsConstructor

;

import

lombok

.

extern

.

slf4j

.

Slf4j

;

import

org

.

jetbrains

.

annotations

.

NotNull

;

import

org

.

redisson

.

api

.

RPermitExpirableSemaphore

;

import

org

.

redisson

.

api

.

RedissonClient

;

import

org

.

springframework

.

core

.

Ordered

;

import

org

.

springframework

.

core

.

annotation

.

Order

;

import

org

.

springframework

.

stereotype

.

Component

;

import

org

.

springframework

.

web

.

filter

.

OncePerRequestFilter

;

import

java

.

io

.

IOException

;

import

java

.

util

.

concurrent

.

TimeUnit

;

/**

  • 文件上传并发控制过滤器

*

  • 用于限制指定上传接口的同时处理数量。

*/

@Slf4j

@Component

@Order

(

Ordered

.

HIGHEST_PRECEDENCE

)

@RequiredArgsConstructor

public

class

GenericUploadConcurrencyFilter

extends

OncePerRequestFilter

{

private

final

RedissonClient

redissonClient

;

private

final

GenericSemaphoreProperties

semaphoreProperties

;

private

static

final

String

UPLOAD_PATH_KEYWORD

=

"/resource/"

;

private

static

final

String

UPLOAD_PATH_SUFFIX

=

"/files/upload"

;

@Override

protected

void

doFilterInternal

(

@NotNull

HttpServletRequest

request

,

@NotNull

HttpServletResponse

response

,

@NotNull

FilterChain

filterChain

)

throws

ServletException

,

IOException

{

if

(

!

isTargetUploadRequest

(

request

)

)

{

filterChain

.

doFilter

(

request

,

response

)

;

return

;

}

GenericSemaphoreProperties

.

ExpirableSemaphoreConfig

config

=

semaphoreProperties

.

getFileUpload

(

)

;

RPermitExpirableSemaphore

semaphore

=

redissonClient

.

getPermitExpirableSemaphore

(

config

.

getName

(

)

)

;

String

permitId

=

null

;

try

{

permitId

=

semaphore

.

tryAcquire

(

config

.

getMaxWaitSeconds

(

)

,

config

.

getLeaseSeconds

(

)

,

TimeUnit

.

SECONDS

)

;

if

(

permitId

==

null

)

{

writeTooManyRequestsResponse

(

response

)

;

return

;

}

filterChain

.

doFilter

(

request

,

response

)

;

}

catch

(

InterruptedException

ex

)

{

Thread

.

currentThread

(

)

.

interrupt

(

)

;

writeServerErrorResponse

(

response

)

;

}

finally

{

releasePermitIfNecessary

(

semaphore

,

permitId

)

;

}

}

/**

  • 判断是否为目标上传请求

*/

private

boolean

isTargetUploadRequest

(

HttpServletRequest

request

)

{

if

(

!

"POST"

.

equalsIgnoreCase

(

request

.

getMethod

(

)

)

)

{

return

false

;

}

String

requestUri

=

request

.

getRequestURI

(

)

;

return

requestUri

!=

null

&&

requestUri

.

contains

(

UPLOAD_PATH_KEYWORD

)

&&

requestUri

.

endsWith

(

UPLOAD_PATH_SUFFIX

)

;

}

/**

  • 返回请求过多响应

*/

private

void

writeTooManyRequestsResponse

(

HttpServletResponse

response

)

throws

IOException

{

response

.

setStatus

(

HttpServletResponse

.

SC_TOO_MANY_REQUESTS

)

;

response

.

setContentType

(

"application/json;charset=UTF-8"

)

;

response

.

getWriter

(

)

.

write

(

"{\"code\":\"429\",\"message\":\"当前上传请求过多,请稍后再试\"}"

)

;

}

/**

  • 返回服务异常响应

*/

private

void

writeServerErrorResponse

(

HttpServletResponse

response

)

throws

IOException

{

response

.

setStatus

(

HttpServletResponse

.

SC_INTERNAL_SERVER_ERROR

)

;

response

.

setContentType

(

"application/json;charset=UTF-8"

)

;

response

.

getWriter

(

)

.

write

(

"{\"code\":\"500\",\"message\":\"获取上传许可失败\"}"

)

;

}

/**

  • 释放信号量许可

*/

private

void

releasePermitIfNecessary

(

RPermitExpirableSemaphore

semaphore

,

String

permitId

)

{

if

(

permitId

==

null

)

{

return

;

}

boolean

released

=

semaphore

.

tryRelease

(

permitId

)

;

if

(

!

released

)

{

log

.

warn

(

"Upload permit has already expired or been released, permitId={}"

,

permitId

)

;

}

}

}

注意,释放许可一定要放在

finally

里。上传可能成功,也可能失败,但只要拿到了许可,请求结束时都要释放。

总结

RAG 文档上传要同时控制两个维度:

文件大小:防止单个请求过大

上传并发:防止太多请求同时进入

文件大小限制靠 context-length

上传并发控制可以用分布式信号量。

比较稳的做法是:

上传入口限制文件大小

上传请求先抢分布式许可

抢不到许可返回 429

抢到许可后再进入后续的业务处理

请求结束释放许可

后续解析、embedding、向量库写入也要单独限流

这样即使服务多实例部署,文档上传入口的总体并发也能保持在可控范围内。

查看原文


🏷 标签: Spring Boot, RAG, 分布式并发控制, Redis, 信号量