我目前正在对RAGFlow进行改造,给它加上了Gunicorn,用gevent模式启动,但是现在有个...
Creado el: 2 de junio de 2025
Creado el: 2 de junio de 2025
我目前正在对RAGFlow进行改造,给它加上了Gunicorn,用gevent模式启动,但是现在有个问题,在跑CI的时候,只要我配置的worker数大于1,就会在Run sdk tests against Infinity步骤里面的t_chunk.py和t_document.py会报错,主要集中于async_parse_documents、doc.add_chunk等方法。我猜测可能是多worker连接infinity或者redis的问题。请帮我排查解决,给出详细的解决方案。如果你还需要更多信息,请让我发给你。
<gunicorn.conf.py>
elasticsearch
(default)infinity
(https://github.com/infiniflow/infinity)opensearch
(https://github.com/opensearch-project/OpenSearch)DOC_ENGINE=${DOC_ENGINE:-elasticsearch}
COMPOSE_PROFILES=${DOC_ENGINE}
STACK_VERSION=8.11.3
ES_HOST=es01
ES_PORT=1200
ELASTIC_PASSWORD=infini_rag_flow
OS_PORT=1201
OS_HOST=opensearch01
OPENSEARCH_PASSWORD=infini_rag_flow_OS_01
KIBANA_PORT=6601
KIBANA_USER=rag_flow
KIBANA_PASSWORD=infini_rag_flow
MEM_LIMIT=8073741824
INFINITY_HOST=infinity
INFINITY_THRIFT_PORT=23817
INFINITY_HTTP_PORT=23820
INFINITY_PSQL_PORT=5432
MYSQL_PASSWORD=infini_rag_flow
MYSQL_HOST=mysql
MYSQL_DBNAME=rag_flow
MYSQL_PORT=5455
MINIO_HOST=minio
MINIO_CONSOLE_PORT=9001
MINIO_PORT=9000
minio.user
entry in service_conf.yaml accordingly.MINIO_USER=rag_flow
minio.password
entry in service_conf.yaml accordingly.MINIO_PASSWORD=infini_rag_flow
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=infini_rag_flow
SVR_HTTP_PORT=9380
RAGFLOW_IMAGE=infiniflow/ragflow:nightly
nightly-slim
edition, uncomment either of the following:nightly
edition, uncomment either of the following:TIMEZONE='Asia/Shanghai'
client_max_body_size
in nginx/nginx.conf is updated accordingly.MAX_CONTENT_LENGTH
nor client_max_body_size
sets the maximum size for files uploaded to an agent.DEBUG
INFO
(default)WARNING
ERROR
ragflow.es_conn
to DEBUG
:REGISTER_ENABLED=1
sandbox-executor-manager
to your /etc/hosts
GUNICORN_WORKERS=4
<redis_conn.py>
import logging
import json
import uuid
import valkey as redis
from rag import settings
from rag.utils import singleton
from valkey.lock import Lock
import trio
class RedisMsg:
def init(self, consumer, queue_name, group_name, msg_id, message):
self.__consumer = consumer
self.__queue_name = queue_name
self.__group_name = group_name
self.__msg_id = msg_id
self.__message = json.loads(message["message"])
textdef ack(self): try: self.__consumer.xack(self.__queue_name, self.__group_name, self.__msg_id) return True except Exception as e: logging.warning("[EXCEPTION]ack" + str(self.__queue_name) + "||" + str(e)) return False def get_message(self): return self.__message def get_msg_id(self): return self.__msg_id
@singleton
class RedisDB:
lua_delete_if_equal = None
LUA_DELETE_IF_EQUAL_SCRIPT = """
local current_value = redis.call('get', KEYS[1])
if current_value and current_value == ARGV[1] then
redis.call('del', KEYS[1])
return 1
end
return 0
"""
textdef __init__(self): self.REDIS = None self.config = settings.REDIS self.__open__() def register_scripts(self) -> None: cls = self.__class__ client = self.REDIS cls.lua_delete_if_equal = client.register_script(cls.LUA_DELETE_IF_EQUAL_SCRIPT) def __open__(self): try: self.REDIS = redis.StrictRedis( host=self.config["host"].split(":")[0], port=int(self.config.get("host", ":6379").split(":")[1]), db=int(self.config.get("db", 1)), password=self.config.get("password"), decode_responses=True, ) self.register_scripts() except Exception: logging.warning("Redis can't be connected.") return self.REDIS def health(self): self.REDIS.ping() a, b = "xx", "yy" self.REDIS.set(a, b, 3) if self.REDIS.get(a) == b: return True def is_alive(self): return self.REDIS is not None def exist(self, k): if not self.REDIS: return try: return self.REDIS.exists(k) except Exception as e: logging.warning("RedisDB.exist " + str(k) + " got exception: " + str(e)) self.__open__() def get(self, k): if not self.REDIS: return try: return self.REDIS.get(k) except Exception as e: logging.warning("RedisDB.get " + str(k) + " got exception: " + str(e)) self.__open__() def set_obj(self, k, obj, exp=3600): try: self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp) return True except Exception as e: logging.warning("RedisDB.set_obj " + str(k) + " got exception: " + str(e)) self.__open__() return False def set(self, k, v, exp=3600): try: self.REDIS.set(k, v, exp) return True except Exception as e: logging.warning("RedisDB.set " + str(k) + " got exception: " + str(e)) self.__open__() return False def sadd(self, key: str, member: str): try: self.REDIS.sadd(key, member) return True except Exception as e: logging.warning("RedisDB.sadd " + str(key) + " got exception: " + str(e)) self.__open__() return False def srem(self, key: str, member: str): try: self.REDIS.srem(key, member) return True except Exception as e: logging.warning("RedisDB.srem " + str(key) + " got exception: " + str(e)) self.__open__() return False def smembers(self, key: str): try: res = self.REDIS.smembers(key) return res except Exception as e: logging.warning( "RedisDB.smembers " + str(key) + " got exception: " + str(e) ) self.__open__() return None def zadd(self, key: str, member: str, score: float): try: self.REDIS.zadd(key, {member: score}) return True except Exception as e: logging.warning("RedisDB.zadd " + str(key) + " got exception: " + str(e)) self.__open__() return False def zcount(self, key: str, min: float, max: float): try: res = self.REDIS.zcount(key, min, max) return res except Exception as e: logging.warning("RedisDB.zcount " + str(key) + " got exception: " + str(e)) self.__open__() return 0 def zpopmin(self, key: str, count: int): try: res = self.REDIS.zpopmin(key, count) return res except Exception as e: logging.warning("RedisDB.zpopmin " + str(key) + " got exception: " + str(e)) self.__open__() return None def zrangebyscore(self, key: str, min: float, max: float): try: res = self.REDIS.zrangebyscore(key, min, max) return res except Exception as e: logging.warning( "RedisDB.zrangebyscore " + str(key) + " got exception: " + str(e) ) self.__open__() return None def transaction(self, key, value, exp=3600): try: pipeline = self.REDIS.pipeline(transaction=True) pipeline.set(key, value, exp, nx=True) pipeline.execute() return True except Exception as e: logging.warning( "RedisDB.transaction " + str(key) + " got exception: " + str(e) ) self.__open__() return False def queue_product(self, queue, message) -> bool: for _ in range(3): try: payload = {"message": json.dumps(message)} self.REDIS.xadd(queue, payload) return True except Exception as e: logging.exception( "RedisDB.queue_product " + str(queue) + " got exception: " + str(e) ) return False def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> RedisMsg: """https://redis.io/docs/latest/commands/xreadgroup/""" try: group_info = self.REDIS.xinfo_groups(queue_name) if not any(gi["name"] == group_name for gi in group_info): self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) args = { "groupname": group_name, "consumername": consumer_name, "count": 1, "block": 5, "streams": {queue_name: msg_id}, } messages = self.REDIS.xreadgroup(**args) if not messages: return None stream, element_list = messages[0] if not element_list: return None msg_id, payload = element_list[0] res = RedisMsg(self.REDIS, queue_name, group_name, msg_id, payload) return res except Exception as e: if str(e) == 'no such key': pass else: logging.exception( "RedisDB.queue_consumer " + str(queue_name) + " got exception: " + str(e) ) return None def get_unacked_iterator(self, queue_names: list[str], group_name, consumer_name): try: for queue_name in queue_names: try: group_info = self.REDIS.xinfo_groups(queue_name) except Exception as e: if str(e) == 'no such key': logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} doesn't exist") continue if not any(gi["name"] == group_name for gi in group_info): logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} group {group_name} doesn't exist") continue current_min = 0 while True: payload = self.queue_consumer(queue_name, group_name, consumer_name, current_min) if not payload: break current_min = payload.get_msg_id() logging.info(f"RedisDB.get_unacked_iterator {queue_name} {consumer_name} {current_min}") yield payload except Exception: logging.exception( "RedisDB.get_unacked_iterator got exception: " ) self.__open__() def get_pending_msg(self, queue, group_name): try: messages = self.REDIS.xpending_range(queue, group_name, '-', '+', 10) return messages except Exception as e: if 'No such key' not in (str(e) or ''): logging.warning( "RedisDB.get_pending_msg " + str(queue) + " got exception: " + str(e) ) return [] def requeue_msg(self, queue: str, group_name: str, msg_id: str): try: messages = self.REDIS.xrange(queue, msg_id, msg_id) if messages: self.REDIS.xadd(queue, messages[0][1]) self.REDIS.xack(queue, group_name, msg_id) except Exception as e: logging.warning( "RedisDB.get_pending_msg " + str(queue) + " got exception: " + str(e) ) def queue_info(self, queue, group_name) -> dict | None: try: groups = self.REDIS.xinfo_groups(queue) for group in groups: if group["name"] == group_name: return group except Exception as e: logging.warning( "RedisDB.queue_info " + str(queue) + " got exception: " + str(e) ) return None def delete_if_equal(self, key: str, expected_value: str) -> bool: """ Do follwing atomically: Delete a key if its value is equals to the given one, do nothing otherwise. """ return bool(self.lua_delete_if_equal(keys=[key], args=[expected_value], client=self.REDIS)) def delete(self, key) -> bool: try: self.REDIS.delete(key) return True except Exception as e: logging.warning("RedisDB.delete " + str(key) + " got exception: " + str(e)) self.__open__() return False
REDIS_CONN = RedisDB()
class RedisDistributedLock:
def init(self, lock_key, lock_value=None, timeout=10, blocking_timeout=1):
self.lock_key = lock_key
if lock_value:
self.lock_value = lock_value
else:
self.lock_value = str(uuid.uuid4())
self.timeout = timeout
self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout)
textdef acquire(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) return self.lock.acquire(token=self.lock_value) async def spin_acquire(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) while True: if self.lock.acquire(token=self.lock_value): break await trio.sleep(10) def release(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)
<相关API>
@manager.route("/datasets/<dataset_id>/chunks", methods=["POST"]) # noqa: F821
@token_required
def parse(tenant_id, dataset_id):
"""
Start parsing documents into chunks.
---
tags:
- Chunks
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: body
name: body
description: Parsing parameters.
required: true
schema:
type: object
properties:
document_ids:
type: array
items:
type: string
description: List of document IDs to parse.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: Parsing started successfully.
schema:
type: object
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = request.json
if not req.get("document_ids"):
return get_error_data_result("document_ids
is required")
doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
textnot_found = [] success_count = 0 for id in doc_list: doc = DocumentService.query(id=id, kb_id=dataset_id) if not doc: not_found.append(id) continue if not doc: return get_error_data_result(message=f"You don't own the document {id}.") if 0.0 < doc[0].progress < 1.0: return get_error_data_result( "Can't parse document that is currently being processed" ) info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0} DocumentService.update_by_id(id, info) settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id) TaskService.filter_delete([Task.doc_id == id]) e, doc = DocumentService.get_by_id(id) doc = doc.to_dict() doc["tenant_id"] = tenant_id bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) queue_tasks(doc, bucket, name, 0) success_count += 1 if not_found: return get_result(message=f"Documents not found: {not_found}", code=settings.RetCode.DATA_ERROR) if duplicate_messages: if success_count > 0: return get_result(message=f"Partially parsed {success_count} documents with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},) else: return get_error_data_result(message=";".join(duplicate_messages)) return get_result()
下面先给出一段“先睹为快”的总结,然后分章节详细说明 如何定位、复现并彻底修复 Gunicorn 多 worker 场景下 t_chunk.py / t_document.py 报错 的全过程;如果其中某一步仍需更多细节,随时告诉我——把堆栈或日志贴上来即可。
在 CI 中启用 workers > 1
时出错,根因基本可以归结为两类:
consumer_name
读取同一个 Stream/Consumer Group 时,Redis 只能把同一条消息同时投递给“名义上同一个”消费者,结果就是两个进程都去执行 doc.add_chunk()
,触发重复写、进度错乱等断言失败。Redis 官方与大量社区帖子都强调“同一组内消费者名必须唯一”以避免此类重复消费问题。Redis Stack Overflowasync_parse_documents
时放大了这个问题。GitHub GitHub下面按步骤展开。
RedisDB.queue_consumer()
前后加日志(包含 os.getpid()
/ consumer_name
/ msg_id
),再跑一次 CI:你会看到同一条 msg_id
被多个进程读到。redis-cli
查看遗留的未确认消息:
大量bashXINFO GROUPS doc_parse_queue XPENDING doc_parse_queue doc_parse_group - + 10
pending
+ 重复 delivery-count
印证了消费者重名。Stack Overflow在 每个 Gunicorn worker 启动后 生成一次唯一名称即可。例如:
python# somewhere executed *after* fork, e.g. inside run_worker() or a small wrapper import os, socket, uuid CONSUMER_NAME = f"{socket.gethostname()}-{os.getpid()}-{uuid.uuid4().hex[:6]}" msg = REDIS_CONN.queue_consumer( queue_name, group_name, CONSUMER_NAME, msg_id=b'>' )
os.getpid()
+ 随机后缀可以保证在同一宿主机、同时启动的多个 worker 也不会重。RedisDistributedLock
。在 async_parse_documents
或 doc.add_chunk
首行加:
即使未来有人忘记改消费者名,也能兜底避免并发写。pythonwith RedisDistributedLock(f"doc-{doc_id}-parse", timeout=60): ...
add_chunks
,官方近期也在修 Concurrent add Bug(issue #6411)GitHub 与批量接口 (#4016) GitHub。redis-py
/Valkey 在 fork 后最好重新建连接。办法很简单:把你的 RedisDB.__open__()
放到 post_fork
钩子 里再调用一次:
python# gunicorn.conf.py def post_fork(server, worker): from rag.redis_conn import REDIS_CONN REDIS_CONN._RedisDB__open__() # or provide a public reset()
这与 gevent 模式兼容;社区 FAQ 也建议如此处理共享连接。Stack Overflow
如果 Infinity 使用的 Thrift TSocket 在父进程已打开,也必须在 post_fork
里显式关闭旧 socket 并 reconnect()
;否则多个进程复用会引发 EPIPE / Connection reset
。
Thrift 社区有 gevent 专用实现(edgeware/gevent-thrift)可用来避免阻塞。GitHub
所有bashGUNICORN_WORKERS=4 pytest -q
sdk/tests/t_chunk.py
与 t_document.py
应通过。post_fork
重建即可。场景 | 建议 | 参考 |
---|---|---|
大批量切分 | 使用 批量 chunk API,一次提交几十条 chunk,减少 round-trip。 | GitHub |
并发写冲突监控 | 在 CI 结束后用 XINFO GROUPS + XPENDING 检查是否还有遗留 pending,作为阻断条件。 | Redis |
版本依赖 | 确保 redis-py ≥5、gevent ≥24,部分旧版在 monkey.patch_all() 下会遗漏 ssl /socket 细节。 | Stack Overflow |
Retry/Backoff | 给 Infinity/Redis 连接增加指数退避,避免短暂不可用造成级联失败。 | Stack Overflow |
consumer_name
;Fork 后重建所有外部连接。GUNICORN_WORKERS
增加到 4 甚至 8 都不会再触发 async_parse_documents
/doc.add_chunk
的竞态。希望这份排查笔记能一次性解决你的痛点;若有遗漏的报错细节或配置差异,直接发我即可。祝改造顺利!