我目前正在对RAGFlow进行改造,给它加上了Gunicorn,用gevent模式启动,但是现在有个...

Создано: 2 июня 2025 г.

Отвечено с использованием O3 от Chat01

Вопрос

我目前正在对RAGFlow进行改造,给它加上了Gunicorn,用gevent模式启动,但是现在有个问题,在跑CI的时候,只要我配置的worker数大于1,就会在Run sdk tests against Infinity步骤里面的t_chunk.py和t_document.py会报错,主要集中于async_parse_documents、doc.add_chunk等方法。我猜测可能是多worker连接infinity或者redis的问题。请帮我排查解决,给出详细的解决方案。如果你还需要更多信息,请让我发给你。

<gunicorn.conf.py>

The type of doc engine to use .

Available options:

- elasticsearch (default)

- infinity (https://github.com/infiniflow/infinity)

- opensearch (https://github.com/opensearch-project/OpenSearch)

DOC_ENGINE=${DOC_ENGINE:-elasticsearch}

------------------------------

docker env var for specifying vector db type at startup

(based on the vector db type, the corresponding docker

compose profile will be used)

------------------------------

COMPOSE_PROFILES=${DOC_ENGINE}

The version of Elasticsearch.

STACK_VERSION=8.11.3

The hostname where the Elasticsearch service is exposed

ES_HOST=es01

The port used to expose the Elasticsearch service to the host machine,

allowing EXTERNAL access to the service running inside the Docker container.

ES_PORT=1200

The password for Elasticsearch.

ELASTIC_PASSWORD=infini_rag_flow

the hostname where OpenSearch service is exposed, set it not the same as elasticsearch

OS_PORT=1201

The hostname where the OpenSearch service is exposed

OS_HOST=opensearch01

The password for OpenSearch.

At least one uppercase letter, one lowercase letter, one digit, and one special character

OPENSEARCH_PASSWORD=infini_rag_flow_OS_01

The port used to expose the Kibana service to the host machine,

allowing EXTERNAL access to the service running inside the Docker container.

KIBANA_PORT=6601
KIBANA_USER=rag_flow
KIBANA_PASSWORD=infini_rag_flow

The maximum amount of the memory, in bytes, that a specific Docker container can use while running.

Update it according to the available memory in the host machine.

MEM_LIMIT=8073741824

The hostname where the Infinity service is exposed

INFINITY_HOST=infinity

Port to expose Infinity API to the host

INFINITY_THRIFT_PORT=23817
INFINITY_HTTP_PORT=23820
INFINITY_PSQL_PORT=5432

The password for MySQL.

MYSQL_PASSWORD=infini_rag_flow

The hostname where the MySQL service is exposed

MYSQL_HOST=mysql

The database of the MySQL service to use

MYSQL_DBNAME=rag_flow

The port used to expose the MySQL service to the host machine,

allowing EXTERNAL access to the MySQL database running inside the Docker container.

MYSQL_PORT=5455

The hostname where the MinIO service is exposed

MINIO_HOST=minio

The port used to expose the MinIO console interface to the host machine,

allowing EXTERNAL access to the web-based console running inside the Docker container.

MINIO_CONSOLE_PORT=9001

The port used to expose the MinIO API service to the host machine,

allowing EXTERNAL access to the MinIO object storage service running inside the Docker container.

MINIO_PORT=9000

The username for MinIO.

When updated, you must revise the minio.user entry in service_conf.yaml accordingly.

MINIO_USER=rag_flow

The password for MinIO.

When updated, you must revise the minio.password entry in service_conf.yaml accordingly.

MINIO_PASSWORD=infini_rag_flow

The hostname where the Redis service is exposed

REDIS_HOST=redis

The port used to expose the Redis service to the host machine,

allowing EXTERNAL access to the Redis service running inside the Docker container.

REDIS_PORT=6379

The password for Redis.

REDIS_PASSWORD=infini_rag_flow

The port used to expose RAGFlow's HTTP API service to the host machine,

allowing EXTERNAL access to the service running inside the Docker container.

SVR_HTTP_PORT=9380

The RAGFlow Docker image to download.

Defaults to the v0.19.0-slim edition, which is the RAGFlow Docker image without embedding models.

RAGFLOW_IMAGE=infiniflow/ragflow:nightly

To download the RAGFlow Docker image with embedding models, uncomment the following line instead:

RAGFLOW_IMAGE=infiniflow/ragflow:v0.19.0

The Docker image of the v0.19.0 edition includes built-in embedding models:

- BAAI/bge-large-zh-v1.5

- maidalun1020/bce-embedding-base_v1

If you cannot download the RAGFlow Docker image:

- For the nightly-slim edition, uncomment either of the following:

RAGFLOW_IMAGE=swr.cn-north-4.myhuaweicloud.com/infiniflow/ragflow:nightly-slim

RAGFLOW_IMAGE=registry.cn-hangzhou.aliyuncs.com/infiniflow/ragflow:nightly-slim

- For the nightly edition, uncomment either of the following:

RAGFLOW_IMAGE=swr.cn-north-4.myhuaweicloud.com/infiniflow/ragflow:nightly

RAGFLOW_IMAGE=registry.cn-hangzhou.aliyuncs.com/infiniflow/ragflow:nightly

The local time zone.

TIMEZONE='Asia/Shanghai'

Uncomment the following line if you have limited access to huggingface.co:

HF_ENDPOINT=https://hf-mirror.com

Optimizations for MacOS

Uncomment the following line if your operating system is MacOS:

MACOS=1

The maximum file size limit (in bytes) for each upload to your knowledge base or File Management.

To change the 1GB file size limit, uncomment the line below and update as needed.

MAX_CONTENT_LENGTH=1073741824

After updating, ensure client_max_body_size in nginx/nginx.conf is updated accordingly.

Note that neither MAX_CONTENT_LENGTH nor client_max_body_size sets the maximum size for files uploaded to an agent.

See https://ragflow.io/docs/dev/begin_component for details.

Log level for the RAGFlow's own and imported packages.

Available levels:

- DEBUG

- INFO (default)

- WARNING

- ERROR

For example, the following line changes the log level of ragflow.es_conn to DEBUG:

LOG_LEVELS=ragflow.es_conn=DEBUG

aliyun OSS configuration

STORAGE_IMPL=OSS

ACCESS_KEY=xxx

SECRET_KEY=eee

ENDPOINT=http://oss-cn-hangzhou.aliyuncs.com

REGION=cn-hangzhou

BUCKET=ragflow65536

A user registration switch:

- Enable registration: 1

- Disable registration: 0

REGISTER_ENABLED=1

Sandbox settings

Important: To enable sandbox, you must re-declare the compose profiles. See hints at the end of file.

Double check if you add sandbox-executor-manager to your /etc/hosts

Pull the required base images before running:

docker pull infiniflow/sandbox-base-nodejs:latest

docker pull infiniflow/sandbox-base-python:latest

Our default sandbox environments include:

- Node.js base image: includes axios

- Python base image: includes requests, numpy, and pandas

Specify custom executor images below if you're using non-default environments.

SANDBOX_ENABLED=1

SANDBOX_HOST=sandbox-executor-manager

SANDBOX_EXECUTOR_MANAGER_IMAGE=infiniflow/sandbox-executor-manager:latest

SANDBOX_EXECUTOR_MANAGER_POOL_SIZE=3

SANDBOX_BASE_PYTHON_IMAGE=infiniflow/sandbox-base-python:latest

SANDBOX_BASE_NODEJS_IMAGE=infiniflow/sandbox-base-nodejs:latest

SANDBOX_EXECUTOR_MANAGER_PORT=9385

SANDBOX_ENABLE_SECCOMP=false

SANDBOX_MAX_MEMORY=256m # b, k, m, g

SANDBOX_TIMEOUT=10s # s, m, 1m30s

Important: To enable sandbox, you must re-declare the compose profiles.

1. Comment out the COMPOSE_PROFILES line above.

2. Uncomment one of the following based on your chosen document engine:

- For Elasticsearch:

COMPOSE_PROFILES=elasticsearch,sandbox

- For Infinity:

COMPOSE_PROFILES=infinity,sandbox

- For OpenSearch:

COMPOSE_PROFILES=opensearch,sandbox

Gunicorn setting

GUNICORN_WORKERS=4

<redis_conn.py>

Copyright 2025 The InfiniFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");

you may not use this file except in compliance with the License.

You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

import logging
import json
import uuid

import valkey as redis
from rag import settings
from rag.utils import singleton
from valkey.lock import Lock
import trio

class RedisMsg:
def init(self, consumer, queue_name, group_name, msg_id, message):
self.__consumer = consumer
self.__queue_name = queue_name
self.__group_name = group_name
self.__msg_id = msg_id
self.__message = json.loads(message["message"])

text
def ack(self): try: self.__consumer.xack(self.__queue_name, self.__group_name, self.__msg_id) return True except Exception as e: logging.warning("[EXCEPTION]ack" + str(self.__queue_name) + "||" + str(e)) return False def get_message(self): return self.__message def get_msg_id(self): return self.__msg_id

@singleton
class RedisDB:
lua_delete_if_equal = None
LUA_DELETE_IF_EQUAL_SCRIPT = """
local current_value = redis.call('get', KEYS[1])
if current_value and current_value == ARGV[1] then
redis.call('del', KEYS[1])
return 1
end
return 0
"""

text
def __init__(self): self.REDIS = None self.config = settings.REDIS self.__open__() def register_scripts(self) -> None: cls = self.__class__ client = self.REDIS cls.lua_delete_if_equal = client.register_script(cls.LUA_DELETE_IF_EQUAL_SCRIPT) def __open__(self): try: self.REDIS = redis.StrictRedis( host=self.config["host"].split(":")[0], port=int(self.config.get("host", ":6379").split(":")[1]), db=int(self.config.get("db", 1)), password=self.config.get("password"), decode_responses=True, ) self.register_scripts() except Exception: logging.warning("Redis can't be connected.") return self.REDIS def health(self): self.REDIS.ping() a, b = "xx", "yy" self.REDIS.set(a, b, 3) if self.REDIS.get(a) == b: return True def is_alive(self): return self.REDIS is not None def exist(self, k): if not self.REDIS: return try: return self.REDIS.exists(k) except Exception as e: logging.warning("RedisDB.exist " + str(k) + " got exception: " + str(e)) self.__open__() def get(self, k): if not self.REDIS: return try: return self.REDIS.get(k) except Exception as e: logging.warning("RedisDB.get " + str(k) + " got exception: " + str(e)) self.__open__() def set_obj(self, k, obj, exp=3600): try: self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp) return True except Exception as e: logging.warning("RedisDB.set_obj " + str(k) + " got exception: " + str(e)) self.__open__() return False def set(self, k, v, exp=3600): try: self.REDIS.set(k, v, exp) return True except Exception as e: logging.warning("RedisDB.set " + str(k) + " got exception: " + str(e)) self.__open__() return False def sadd(self, key: str, member: str): try: self.REDIS.sadd(key, member) return True except Exception as e: logging.warning("RedisDB.sadd " + str(key) + " got exception: " + str(e)) self.__open__() return False def srem(self, key: str, member: str): try: self.REDIS.srem(key, member) return True except Exception as e: logging.warning("RedisDB.srem " + str(key) + " got exception: " + str(e)) self.__open__() return False def smembers(self, key: str): try: res = self.REDIS.smembers(key) return res except Exception as e: logging.warning( "RedisDB.smembers " + str(key) + " got exception: " + str(e) ) self.__open__() return None def zadd(self, key: str, member: str, score: float): try: self.REDIS.zadd(key, {member: score}) return True except Exception as e: logging.warning("RedisDB.zadd " + str(key) + " got exception: " + str(e)) self.__open__() return False def zcount(self, key: str, min: float, max: float): try: res = self.REDIS.zcount(key, min, max) return res except Exception as e: logging.warning("RedisDB.zcount " + str(key) + " got exception: " + str(e)) self.__open__() return 0 def zpopmin(self, key: str, count: int): try: res = self.REDIS.zpopmin(key, count) return res except Exception as e: logging.warning("RedisDB.zpopmin " + str(key) + " got exception: " + str(e)) self.__open__() return None def zrangebyscore(self, key: str, min: float, max: float): try: res = self.REDIS.zrangebyscore(key, min, max) return res except Exception as e: logging.warning( "RedisDB.zrangebyscore " + str(key) + " got exception: " + str(e) ) self.__open__() return None def transaction(self, key, value, exp=3600): try: pipeline = self.REDIS.pipeline(transaction=True) pipeline.set(key, value, exp, nx=True) pipeline.execute() return True except Exception as e: logging.warning( "RedisDB.transaction " + str(key) + " got exception: " + str(e) ) self.__open__() return False def queue_product(self, queue, message) -> bool: for _ in range(3): try: payload = {"message": json.dumps(message)} self.REDIS.xadd(queue, payload) return True except Exception as e: logging.exception( "RedisDB.queue_product " + str(queue) + " got exception: " + str(e) ) return False def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> RedisMsg: """https://redis.io/docs/latest/commands/xreadgroup/""" try: group_info = self.REDIS.xinfo_groups(queue_name) if not any(gi["name"] == group_name for gi in group_info): self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) args = { "groupname": group_name, "consumername": consumer_name, "count": 1, "block": 5, "streams": {queue_name: msg_id}, } messages = self.REDIS.xreadgroup(**args) if not messages: return None stream, element_list = messages[0] if not element_list: return None msg_id, payload = element_list[0] res = RedisMsg(self.REDIS, queue_name, group_name, msg_id, payload) return res except Exception as e: if str(e) == 'no such key': pass else: logging.exception( "RedisDB.queue_consumer " + str(queue_name) + " got exception: " + str(e) ) return None def get_unacked_iterator(self, queue_names: list[str], group_name, consumer_name): try: for queue_name in queue_names: try: group_info = self.REDIS.xinfo_groups(queue_name) except Exception as e: if str(e) == 'no such key': logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} doesn't exist") continue if not any(gi["name"] == group_name for gi in group_info): logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} group {group_name} doesn't exist") continue current_min = 0 while True: payload = self.queue_consumer(queue_name, group_name, consumer_name, current_min) if not payload: break current_min = payload.get_msg_id() logging.info(f"RedisDB.get_unacked_iterator {queue_name} {consumer_name} {current_min}") yield payload except Exception: logging.exception( "RedisDB.get_unacked_iterator got exception: " ) self.__open__() def get_pending_msg(self, queue, group_name): try: messages = self.REDIS.xpending_range(queue, group_name, '-', '+', 10) return messages except Exception as e: if 'No such key' not in (str(e) or ''): logging.warning( "RedisDB.get_pending_msg " + str(queue) + " got exception: " + str(e) ) return [] def requeue_msg(self, queue: str, group_name: str, msg_id: str): try: messages = self.REDIS.xrange(queue, msg_id, msg_id) if messages: self.REDIS.xadd(queue, messages[0][1]) self.REDIS.xack(queue, group_name, msg_id) except Exception as e: logging.warning( "RedisDB.get_pending_msg " + str(queue) + " got exception: " + str(e) ) def queue_info(self, queue, group_name) -> dict | None: try: groups = self.REDIS.xinfo_groups(queue) for group in groups: if group["name"] == group_name: return group except Exception as e: logging.warning( "RedisDB.queue_info " + str(queue) + " got exception: " + str(e) ) return None def delete_if_equal(self, key: str, expected_value: str) -> bool: """ Do follwing atomically: Delete a key if its value is equals to the given one, do nothing otherwise. """ return bool(self.lua_delete_if_equal(keys=[key], args=[expected_value], client=self.REDIS)) def delete(self, key) -> bool: try: self.REDIS.delete(key) return True except Exception as e: logging.warning("RedisDB.delete " + str(key) + " got exception: " + str(e)) self.__open__() return False

REDIS_CONN = RedisDB()

class RedisDistributedLock:
def init(self, lock_key, lock_value=None, timeout=10, blocking_timeout=1):
self.lock_key = lock_key
if lock_value:
self.lock_value = lock_value
else:
self.lock_value = str(uuid.uuid4())
self.timeout = timeout
self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout)

text
def acquire(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) return self.lock.acquire(token=self.lock_value) async def spin_acquire(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) while True: if self.lock.acquire(token=self.lock_value): break await trio.sleep(10) def release(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)

<相关API>
@manager.route("/datasets/<dataset_id>/chunks", methods=["POST"]) # noqa: F821
@token_required
def parse(tenant_id, dataset_id):
"""
Start parsing documents into chunks.
---
tags:
- Chunks
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: body
name: body
description: Parsing parameters.
required: true
schema:
type: object
properties:
document_ids:
type: array
items:
type: string
description: List of document IDs to parse.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: Parsing started successfully.
schema:
type: object
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = request.json
if not req.get("document_ids"):
return get_error_data_result("document_ids is required")
doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids

text
not_found = [] success_count = 0 for id in doc_list: doc = DocumentService.query(id=id, kb_id=dataset_id) if not doc: not_found.append(id) continue if not doc: return get_error_data_result(message=f"You don't own the document {id}.") if 0.0 < doc[0].progress < 1.0: return get_error_data_result( "Can't parse document that is currently being processed" ) info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0} DocumentService.update_by_id(id, info) settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id) TaskService.filter_delete([Task.doc_id == id]) e, doc = DocumentService.get_by_id(id) doc = doc.to_dict() doc["tenant_id"] = tenant_id bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) queue_tasks(doc, bucket, name, 0) success_count += 1 if not_found: return get_result(message=f"Documents not found: {not_found}", code=settings.RetCode.DATA_ERROR) if duplicate_messages: if success_count > 0: return get_result(message=f"Partially parsed {success_count} documents with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},) else: return get_error_data_result(message=";".join(duplicate_messages)) return get_result()

Мысль

Identifying Redis consumer group issue

The problem stems from using the same consumer_name in multiple workers. If all workers share the same name, Redis may deliver the same task to multiple processes, causing duplicates and test failures. The Redis documentation stresses that each consumer in a group must have a unique name to maintain message consistency. Using the same name breaks the “exactly once” guarantee, leading to potential message duplications, as highlighted in Stack Overflow discussions.

Improving concurrency handling

To fix the issue, we need to ensure that each Gunicorn worker has a unique consumer_name for Redis streams, perhaps using process IDs or server names. I'll suggest modifying the queue_consumer to include os.getpid() or socket.gethostname(). For Redis distributed locks, each doc parse should acquire a lock with the document to avoid concurrency issues. Regarding Infinity engine connections, we need each worker to create its own client after a fork to prevent errors. Additional debugging steps will confirm duplicate chunk counts and test Redis configurations.

Thought for 1m 51s

Ответ

下面先给出一段“先睹为快”的总结,然后分章节详细说明 如何定位、复现并彻底修复 Gunicorn 多 worker 场景下 t_chunk.py / t_document.py 报错 的全过程;如果其中某一步仍需更多细节,随时告诉我——把堆栈或日志贴上来即可。

在 CI 中启用 workers > 1 时出错,根因基本可以归结为两类:

  1. Redis Streams 消费者重名 + 并发写导致的数据竞争
    多个 Gunicorn 进程使用同一个 consumer_name 读取同一个 Stream/Consumer Group 时,Redis 只能把同一条消息同时投递给“名义上同一个”消费者,结果就是两个进程都去执行 doc.add_chunk(),触发重复写、进度错乱等断言失败。Redis 官方与大量社区帖子都强调“同一组内消费者名必须唯一”以避免此类重复消费问题。Redis Stack Overflow
  2. 在 fork 之后复用旧连接(Thrift→Infinity 及 Redis)
    Gunicorn 先 fork 再运行 worker,父进程里建立的 Socket 会被子进程继承,但这些连接并不支持多进程安全复用。只要一个子进程关闭或写坏了连接,另外的进程就会收到 “Connection reset/EOF” 之类异常,CI 测试恰好在并发 async_parse_documents 时放大了这个问题。GitHub GitHub

下面按步骤展开。


1 复现与快速定位

  1. 单 worker 通过,>1 失败 —— 说明逻辑竞态而非代码语法。
  2. RedisDB.queue_consumer() 前后加日志(包含 os.getpid() / consumer_name / msg_id),再跑一次 CI:你会看到同一条 msg_id 被多个进程读到。
  3. redis-cli 查看遗留的未确认消息:
    bash
    XINFO GROUPS doc_parse_queue XPENDING doc_parse_queue doc_parse_group - + 10
    大量 pending + 重复 delivery-count 印证了消费者重名。Stack Overflow

2 修复 Redis Streams 消费冲突

2.1 保证消费者名唯一

每个 Gunicorn worker 启动后 生成一次唯一名称即可。例如:

python
# somewhere executed *after* fork, e.g. inside run_worker() or a small wrapper import os, socket, uuid CONSUMER_NAME = f"{socket.gethostname()}-{os.getpid()}-{uuid.uuid4().hex[:6]}" msg = REDIS_CONN.queue_consumer( queue_name, group_name, CONSUMER_NAME, msg_id=b'>' )
  • os.getpid() + 随机后缀可以保证在同一宿主机、同时启动的多个 worker 也不会重。
  • 这样 Redis 会把一条消息分配给 不同 consumer,各进程各拿一份互不干扰。
  • 相关规范与踩坑示例:Redis GitHub Stack Overflow

2.2 加锁 / 幂等保护(可选但推荐)

  • 你已经有 RedisDistributedLock。在 async_parse_documentsdoc.add_chunk 首行加:
    python
    with RedisDistributedLock(f"doc-{doc_id}-parse", timeout=60): ...
    即使未来有人忘记改消费者名,也能兜底避免并发写。
  • 另一个思路是改进 RAGFlow 自带批量接口 add_chunks,官方近期也在修 Concurrent add Bug(issue #6411)GitHub 与批量接口 (#4016) GitHub

3 在 Fork 之后重建外部连接

3.1 Redis 连接

redis-py/Valkey 在 fork 后最好重新建连接。办法很简单:把你的 RedisDB.__open__() 放到 post_fork 钩子 里再调用一次:

python
# gunicorn.conf.py def post_fork(server, worker): from rag.redis_conn import REDIS_CONN REDIS_CONN._RedisDB__open__() # or provide a public reset()

这与 gevent 模式兼容;社区 FAQ 也建议如此处理共享连接。Stack Overflow

3.2 Infinity / Thrift 客户端

如果 Infinity 使用的 Thrift TSocket 在父进程已打开,也必须在 post_fork 里显式关闭旧 socket 并 reconnect();否则多个进程复用会引发 EPIPE / Connection reset
Thrift 社区有 gevent 专用实现(edgeware/gevent-thrift)可用来避免阻塞。GitHub


4 Gunicorn + gevent 进程/协程模型注意事项

  • 每个 Gunicorn worker 是独立进程,不共享全局变量;只能通过外部系统(Redis、DB)通信。GitHub GitHub
  • gevent 只解决协程内阻塞;多进程仍需锁/唯一 ID。Stack Overflow

5 CI 测试 & 回归

  1. 在分支里应用上面两处改动后,本地跑:
    bash
    GUNICORN_WORKERS=4 pytest -q
    所有 sdk/tests/t_chunk.pyt_document.py 应通过。
  2. 若仍有随机失败,检查是否还有“单例”对象在 master 里预先初始化了网络连接;同理搬到 post_fork 重建即可。

6 额外强化建议

场景建议参考
大批量切分使用 批量 chunk API,一次提交几十条 chunk,减少 round-trip。GitHub
并发写冲突监控在 CI 结束后用 XINFO GROUPS + XPENDING 检查是否还有遗留 pending,作为阻断条件。Redis
版本依赖确保 redis-py ≥5、gevent ≥24,部分旧版在 monkey.patch_all() 下会遗漏 ssl/socket 细节。Stack Overflow
Retry/Backoff给 Infinity/Redis 连接增加指数退避,避免短暂不可用造成级联失败。Stack Overflow

7 结语

  • 最关键的修复:为每个进程生成唯一 consumer_name;Fork 后重建所有外部连接。
  • 这样一来,无论本地还是 CI,GUNICORN_WORKERS 增加到 4 甚至 8 都不会再触发 async_parse_documents/doc.add_chunk 的竞态。
  • 如果日志里还有残余异常,把堆栈贴给我,我再帮你针对性排查。

希望这份排查笔记能一次性解决你的痛点;若有遗漏的报错细节或配置差异,直接发我即可。祝改造顺利!

Поделиться этим Q&A