# -*- coding: utf-8 -*- # This file is auto-genera...
Creato il: 25 dicembre 2025
Risposto usando GPT-5.2 Thinking da Chat01
Creato il: 25 dicembre 2025
Risposto usando GPT-5.2 Thinking da Chat01
import os
import sys
import json
from typing import List
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def init(self):
pass
text@staticmethod def create_client() -> Sls20201230Client: """ 使用凭据初始化账号Client @return: Client @throws Exception """ # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。 credential = CredentialClient() config = open_api_models.Config( credential=credential ) # Endpoint 请参考 https://api.aliyun.com/product/Sls config.endpoint = f'cn-hangzhou.log.aliyuncs.com' return Sls20201230Client(config) @staticmethod def main( args: List[str], ) -> None: client = Sample.create_client() runtime = util_models.RuntimeOptions() headers = {} try: resp = client.list_shards_with_options('k8s-log-c080313797e524f98b37f23ea71dcd134', 'zhmle-registry-server', headers, runtime) print(json.dumps(resp, default=str, indent=2)) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) @staticmethod async def main_async( args: List[str], ) -> None: client = Sample.create_client() runtime = util_models.RuntimeOptions() headers = {} try: resp = await client.list_shards_with_options_async('k8s-log-c080313797e524f98b37f23ea71dcd134', 'zhmle-registry-server', headers, runtime) print(json.dumps(resp, default=str, indent=2)) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend"))
if name == 'main':
Sample.main(sys.argv[1:])
这个是shard查看的脚本,帮我写为批量查看project中的所有logstore,然后输出shard>1的logstore名称和shard数
python# Install package if missing import pkgutil, subprocess, sys, importlib, inspect, re, json, os pkg_name = "alibabacloud_sls20201230" if pkgutil.find_loader(pkg_name) is None: print("installing...") subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "alibabacloud-sls20201230"]) import alibabacloud_sls20201230 from alibabacloud_sls20201230.client import Client as SlsClient print("version", alibabacloud_sls20201230.__version__ if hasattr(alibabacloud_sls20201230, "__version__") else "unknown")
pythonimport inspect, re methods = [m for m in dir(SlsClient) if "logstore" in m.lower() and "list" in m.lower()] methods[:50], len(methods) ([], 0)
python[m for m in dir(SlsClient) if m.startswith("list_")][:50] ['list_agent_instance_configs', 'list_agent_instance_configs_async', 'list_agent_instance_configs_with_options', 'list_agent_instance_configs_with_options_async', 'list_ai_tools', 'list_ai_tools_async', 'list_ai_tools_with_options', 'list_ai_tools_with_options_async', 'list_alerts', 'list_alerts_async', 'list_alerts_with_options', 'list_alerts_with_options_async', 'list_annotation_data', 'list_annotation_data_async', 'list_annotation_data_sets', 'list_annotation_data_sets_async', 'list_annotation_data_sets_with_options', 'list_annotation_data_sets_with_options_async', 'list_annotation_data_with_options', 'list_annotation_data_with_options_async', 'list_annotation_labels', 'list_annotation_labels_async', 'list_annotation_labels_with_options', 'list_annotation_labels_with_options_async', 'list_azure_blob_ingestion', 'list_azure_blob_ingestion_async', 'list_azure_blob_ingestion_with_options', 'list_azure_blob_ingestion_with_options_async', 'list_collection_policies', 'list_collection_policies_async', 'list_collection_policies_with_options', 'list_collection_policies_with_options_async', 'list_config', 'list_config_async', 'list_config_with_options', 'list_config_with_options_async', 'list_consume_processors', 'list_consume_processors_async', 'list_consume_processors_with_options', 'list_consume_processors_with_options_async', 'list_consumer_group', 'list_consumer_group_async', 'list_consumer_group_with_options', 'list_consumer_group_with_options_async', 'list_dashboard', 'list_dashboard_async', 'list_dashboard_with_options', 'list_dashboard_with_options_async', 'list_domains', 'list_domains_async']
python[m for m in dir(SlsClient) if "log_store" in m.lower() or "logstore" in m.lower()][:50], len([m for m in dir(SlsClient) if "log_store" in m.lower() or "logstore" in m.lower()]) (['create_log_store', 'create_log_store_async', 'create_log_store_with_options', 'create_log_store_with_options_async', 'delete_log_store', 'delete_log_store_async', 'delete_log_store_with_options', 'delete_log_store_with_options_async', 'get_log_store', 'get_log_store_async', 'get_log_store_metering_mode', 'get_log_store_metering_mode_async', 'get_log_store_metering_mode_with_options', 'get_log_store_metering_mode_with_options_async', 'get_log_store_with_options', 'get_log_store_with_options_async', 'list_log_stores', 'list_log_stores_async', 'list_log_stores_with_options', 'list_log_stores_with_options_async', 'update_log_store', 'update_log_store_async', 'update_log_store_encryption', 'update_log_store_encryption_async', 'update_log_store_encryption_with_options', 'update_log_store_encryption_with_options_async', 'update_log_store_metering_mode', 'update_log_store_metering_mode_async', 'update_log_store_metering_mode_with_options', 'update_log_store_metering_mode_with_options_async', 'update_log_store_processor', 'update_log_store_processor_async', 'update_log_store_processor_with_options', 'update_log_store_processor_with_options_async', 'update_log_store_with_options', 'update_log_store_with_options_async'], 36)
python[m for m in dir(SlsClient) if "log_stores" in m.lower()] ['list_log_stores', 'list_log_stores_async', 'list_log_stores_with_options', 'list_log_stores_with_options_async']
pythonimport inspect sig = inspect.signature(SlsClient.list_log_stores) sig <Signature (self, project: str, request: alibabacloud_sls20201230.models.ListLogStoresRequest) -> alibabacloud_sls20201230.models.ListLogStoresResponse>
pythoninspect.signature(SlsClient.list_shards_with_options), inspect.signature(SlsClient.list_shards) (<Signature (self, project: str, logstore: str, headers: Dict[str, str], runtime: alibabacloud_tea_util.models.RuntimeOptions) -> alibabacloud_sls20201230.models.ListShardsResponse>, <Signature (self, project: str, logstore: str) -> alibabacloud_sls20201230.models.ListShardsResponse>)
pythonfrom alibabacloud_sls20201230 import models req_cls = models.ListLogStoresRequest inspect.signature(req_cls.__init__), [a for a in dir(req_cls) if a.startswith("set_")][:20] (<Signature (self, logstore_name: str = None, mode: str = None, offset: int = None, size: int = None, telemetry_type: str = None)>, [])
pythonreq = models.ListLogStoresRequest() req.to_map() if hasattr(req, "to_map") else None, req.__dict__ ({}, {'logstore_name': None, 'mode': None, 'offset': None, 'size': None, 'telemetry_type': None})
pythonresp_cls = models.ListLogStoresResponse inspect.signature(resp_cls.__init__) <Signature (self, headers: Dict[str, str] = None, status_code: int = None, body: alibabacloud_sls20201230.models.ListLogStoresResponseBody = None)>
pythonbody_cls = models.ListLogStoresResponseBody inspect.signature(body_cls.__init__), body_cls().__dict__ (<Signature (self, count: int = None, logstores: List[str] = None, total: int = None)>, {'count': None, 'logstores': None, 'total': None})
pythonimport re shard_models = [n for n in dir(models) if "Shard" in n] shard_models[:50], len(shard_models) (['ListShardsResponse', 'MergeShardResponse', 'Shard', 'ShardingPolicy', 'ShardingPolicyShardGroup', 'ShardingPolicyShardHash', 'SplitShardRequest', 'SplitShardResponse'], 8)
pythoninspect.signature(models.ListShardsResponse.__init__), models.ListShardsResponse().__dict__ (<Signature (self, headers: Dict[str, str] = None, status_code: int = None, body: List[alibabacloud_sls20201230.models.Shard] = None)>, {'headers': None, 'status_code': None, 'body': None})
pythonimport inspect inspect.signature(SlsClient.list_log_stores_with_options) <Signature (self, project: str, request: alibabacloud_sls20201230.models.ListLogStoresRequest, headers: Dict[str, str], runtime: alibabacloud_tea_util.models.RuntimeOptions) -> alibabacloud_sls20201230.models.ListLogStoresResponse>
下面这个脚本会分页拉取某个 project 下的全部 Logstore(ListLogStores 的 size 最大 500,配合 offset 翻页),然后对每个 Logstore 调用 ListShards 统计 shard 数,只输出 shard > 1 的 logstore 名称和 shard 数。 (Alibaba Cloud)
python# -*- coding: utf-8 -*- import sys import json import argparse from typing import List, Dict, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models def create_client(endpoint: str) -> Sls20201230Client: """ 使用凭据链初始化 Client(推荐无 AK 方式:环境变量/实例角色/STS 等由 CredentialClient 自动解析) """ credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) def list_all_logstores( client: Sls20201230Client, project: str, page_size: int = 500, ) -> List[str]: """ 分页列出 project 下所有 logstore """ runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_logstores: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_logstores.extend(batch) # total 可能为空,兜底用当前累计数量 total = body.total if (body and body.total is not None) else len(all_logstores) if not batch or len(all_logstores) >= total: break offset += len(batch) return all_logstores def shard_count_of_logstore(client: Sls20201230Client, project: str, logstore: str) -> int: """ 返回指定 logstore 的 shard 数(ListShards 返回的是 shard 数组) """ runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) return len(resp.body or []) def main(): parser = argparse.ArgumentParser( description="List logstores in a project whose shard count > 1, and print logstore name + shard count." ) parser.add_argument("project", help="SLS project name, e.g. k8s-log-xxxx") parser.add_argument( "--endpoint", default="cn-hangzhou.log.aliyuncs.com", help="SLS endpoint, default: cn-hangzhou.log.aliyuncs.com", ) parser.add_argument( "--min-shards", type=int, default=2, help="Only output logstores with shard_count >= min_shards (default: 2, i.e. shard > 1)", ) parser.add_argument( "--json", action="store_true", help="Output as JSON array instead of TSV lines", ) args = parser.parse_args() client = create_client(args.endpoint) logstores = list_all_logstores(client, args.project) results: List[Tuple[str, int]] = [] for ls in logstores: try: cnt = shard_count_of_logstore(client, args.project, ls) if cnt >= args.min_shards: results.append((ls, cnt)) except Exception as e: # 出错就跳过该 logstore;你也可以改成记录到文件/打更多诊断信息 print(f"[WARN] list_shards failed for logstore={ls}: {e}", file=sys.stderr) if args.json: payload = [{"logstore": name, "shards": cnt} for name, cnt in results] print(json.dumps(payload, ensure_ascii=False, indent=2)) else: # TSV 输出:logstore<TAB>shards for name, cnt in results: print(f"{name}\t{cnt}") if __name__ == "__main__": main()
用法示例(按你脚本里的 region 来改 endpoint):
python list_shards_gt1.py k8s-log-c080313797e524f98b37f23ea71dcd134 --endpoint cn-hangzhou.log.aliyuncs.compython list_shards_gt1.py k8s-log-... --json如果你们 project 里 logstore 很多、调用速度慢,我也可以再给你加一个 ThreadPoolExecutor 并发版本(带限速/重试),避免触发 API 频控。
脚本如何使用
你把我给的脚本保存成一个文件,比如 list_shards_gt1.py,然后按下面方式运行就行。
安装依赖(和你原脚本一致的 SDK):
bashpip install alibabacloud-sls20201230 alibabacloud-credentials alibabacloud-tea-openapi alibabacloud-tea-util
认证方式:脚本里用的是 CredentialClient()(凭据链)。常见用法是先在环境变量里配置 AK(或使用更安全的方式如实例角色/STS)。
最简单(不推荐长期使用,但方便测试):
bashexport ALIBABA_CLOUD_ACCESS_KEY_ID="你的AK" export ALIBABA_CLOUD_ACCESS_KEY_SECRET="你的SK" export ALIBABA_CLOUD_REGION_ID="cn-hangzhou" # 可选
如果你是 ECS/容器里跑,建议用 RAM Role / OIDC / STS 等无 AK 方式,
CredentialClient()也能自动识别。
脚本参数里 project 是必填,endpoint 可选(默认 cn-hangzhou.log.aliyuncs.com)。
bashpython list_shards_gt1.py k8s-log-c080313797e524f98b37f23ea71dcd134 --endpoint cn-hangzhou.log.aliyuncs.com
输出示例(TSV):
textzhmle-registry-server 2 some-logstore 4
bashpython list_shards_gt1.py k8s-log-c080313797e524f98b37f23ea71dcd134 --endpoint cn-hangzhou.log.aliyuncs.com --json
bashpython list_shards_gt1.py k8s-log-xxx --endpoint cn-hangzhou.log.aliyuncs.com --min-shards 3
bashpython list_shards_gt1.py -h
endpoint 写哪个?
和你 project 所在 region 一致,比如杭州就是 cn-hangzhou.log.aliyuncs.com。其它 region 也类似:cn-shanghai.log.aliyuncs.com、ap-southeast-1.log.aliyuncs.com 等。
报权限错误(403 / AccessDenied)
你的 RAM 用户/角色需要对 SLS 的 ListLogStores 和 ListShards 之类 API 有权限(通常给项目级别权限即可)。
如果你把报错贴出来(尤其是 error message / Recommend 链接内容),我可以帮你快速定位是 endpoint、凭据链还是权限策略问题。
把代码修改为只输出读写shard>1的logstore,提供下完整代码
下面是完整脚本:会批量列出指定 project 下所有 logstore,然后调用 ListShards,只统计 status == "readwrite" 的 shard 数,并且只输出读写 shard > 1 的 logstore 名称和读写 shard 数。(ListShards 的返回示例里就有 status: readwrite 字段 (Alibaba Cloud);ListLogStores 支持 offset/size 分页,size 最大 500 (Alibaba Cloud))
python# -*- coding: utf-8 -*- import sys import json import argparse from typing import List, Dict, Any, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models class Sample: @staticmethod def create_client(endpoint: str) -> Sls20201230Client: """ 使用凭据链初始化账号 Client(CredentialClient 会自动从环境变量/实例角色/STS 等取凭据) """ credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) @staticmethod def _get_error_message(e: Exception) -> str: return getattr(e, "message", None) or str(e) @staticmethod def _get_error_recommend(e: Exception) -> str: data = getattr(e, "data", None) if isinstance(data, dict): return str(data.get("Recommend") or "") return "" @staticmethod def list_all_logstores(client: Sls20201230Client, project: str, page_size: int = 500) -> List[str]: """ 分页列出 project 下所有 logstore """ runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_logstores: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_logstores.extend(batch) total = body.total if (body and body.total is not None) else len(all_logstores) if not batch or len(all_logstores) >= total: break offset += len(batch) return all_logstores @staticmethod def count_readwrite_shards(client: Sls20201230Client, project: str, logstore: str) -> int: """ 统计指定 logstore 里 status == "readwrite" 的 shard 数 ListShards 返回的是 shard 数组,每个 shard 有 status 字段(例如 readwrite/readonly) """ runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) shards = resp.body or [] cnt = 0 for s in shards: # 兼容 SDK 可能返回 dict 或对象两种情况 status = None if isinstance(s, dict): status = s.get("status") else: status = getattr(s, "status", None) if isinstance(status, str) and status.lower() == "readwrite": cnt += 1 return cnt @staticmethod def main(argv: List[str]) -> None: parser = argparse.ArgumentParser( description="List logstores whose READWRITE shard count > 1, and print logstore name + readwrite shard count." ) parser.add_argument("project", help="SLS project name, e.g. k8s-log-xxxx") parser.add_argument( "--endpoint", default="cn-hangzhou.log.aliyuncs.com", help="SLS endpoint, default: cn-hangzhou.log.aliyuncs.com", ) parser.add_argument( "--min-readwrite-shards", type=int, default=2, help='Only output logstores with readwrite_shards >= N (default 2 means "readwrite shard > 1")', ) parser.add_argument( "--json", action="store_true", help="Output as JSON array instead of TSV lines", ) args = parser.parse_args(argv) client = Sample.create_client(args.endpoint) try: logstores = Sample.list_all_logstores(client, args.project) except Exception as e: msg = Sample._get_error_message(e) rec = Sample._get_error_recommend(e) print(f"[ERROR] ListLogStores failed: {msg}", file=sys.stderr) if rec: print(f"[ERROR] Recommend: {rec}", file=sys.stderr) sys.exit(1) results: List[Tuple[str, int]] = [] for ls in logstores: try: rw_cnt = Sample.count_readwrite_shards(client, args.project, ls) if rw_cnt >= args.min_readwrite_shards: results.append((ls, rw_cnt)) except Exception as e: msg = Sample._get_error_message(e) rec = Sample._get_error_recommend(e) print(f"[WARN] ListShards failed for logstore={ls}: {msg}", file=sys.stderr) if rec: print(f"[WARN] Recommend: {rec}", file=sys.stderr) if args.json: payload = [{"logstore": name, "readwrite_shards": cnt} for name, cnt in results] print(json.dumps(payload, ensure_ascii=False, indent=2)) else: # TSV:logstore<TAB>readwrite_shards for name, cnt in results: print(f"{name}\t{cnt}") if __name__ == "__main__": Sample.main(sys.argv[1:])
运行示例:
bashpython list_readwrite_shards_gt1.py k8s-log-c080313797e524f98b37f23ea71dcd134 --endpoint cn-hangzhou.log.aliyuncs.com
bashpython list_readwrite_shards_gt1.py k8s-log-xxx --json
如果你希望“读写 shard”之外还把 readonly 的数量也一起打印出来(用于排查正在 split/merge 的 logstore),我也可以给你加一列。
import os
import sys
import json
from typing import List
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def init(self):
pass
text@staticmethod def create_client() -> Sls20201230Client: """ 使用凭据初始化账号Client @return: Client @throws Exception """ # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。 credential = CredentialClient() config = open_api_models.Config( credential=credential ) # Endpoint 请参考 https://api.aliyun.com/product/Sls config.endpoint = f'ap-southeast-1.log.aliyuncs.com' return Sls20201230Client(config) @staticmethod def main( args: List[str], ) -> None: client = Sample.create_client() runtime = util_models.RuntimeOptions() headers = {} try: resp = client.merge_shard_with_options('k8s-log-cf5fd9866be3d44f7bc86aca1950ca478', 'analysis-service', '', headers, runtime) print(json.dumps(resp, default=str, indent=2)) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) @staticmethod async def main_async( args: List[str], ) -> None: client = Sample.create_client() runtime = util_models.RuntimeOptions() headers = {} try: resp = await client.merge_shard_with_options_async('k8s-log-cf5fd9866be3d44f7bc86aca1950ca478', 'analysis-service', '', headers, runtime) print(json.dumps(resp, default=str, indent=2)) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend"))
if name == 'main':
Sample.main(sys.argv[1:])
这是合并shard的脚本,帮我把上面筛出的所有shard不为1的logstore全部合并为1
下面给你一份完整可直接运行的脚本:会在指定 project 下批量遍历所有 logstore,找出 readwrite shard 数 > 1 的 logstore,然后对每个 logstore 循环 merge,直到 readwrite shard 只剩 1 个(每次 merge 会合并一对相邻的 readwrite shard,参数传“第一块 shard 的 shardId”)。
说明:
MergeShard只能合并相邻且都是 readwrite 状态的两块 shard,所以脚本会先ListShards,再从 shard 列表里找“相邻 readwrite + readwrite”的一对来 merge。
python# -*- coding: utf-8 -*- import sys import json import time import argparse from typing import List, Dict, Any, Optional, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models def create_client(endpoint: str) -> Sls20201230Client: credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) def _get(obj: Any, *names: str) -> Any: """兼容 dict / object 两种返回结构""" if isinstance(obj, dict): for n in names: if n in obj: return obj.get(n) # 再做一次大小写不敏感兜底 lower_map = {str(k).lower(): k for k in obj.keys()} for n in names: k = lower_map.get(n.lower()) if k is not None: return obj.get(k) return None for n in names: if hasattr(obj, n): return getattr(obj, n) # snake_case 兜底(很少用到) for n in names: sn = [] for ch in n: if ch.isupper(): sn.append("_") sn.append(ch.lower()) else: sn.append(ch) sn = "".join(sn).lstrip("_") if hasattr(obj, sn): return getattr(obj, sn) return None def list_all_logstores(client: Sls20201230Client, project: str, page_size: int = 500) -> List[str]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_ls: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_ls.extend(batch) total = body.total if (body and body.total is not None) else len(all_ls) if not batch or len(all_ls) >= total: break offset += len(batch) return all_ls def list_shards(client: Sls20201230Client, project: str, logstore: str) -> List[Any]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) return resp.body or [] def count_readwrite_shards(shards: List[Any]) -> int: cnt = 0 for s in shards: status = _get(s, "status") if isinstance(status, str) and status.lower() == "readwrite": cnt += 1 return cnt def _begin_key(s: Any) -> str: # inclusiveBeginKey 是 MD5 range 的起始 bk = _get(s, "inclusiveBeginKey", "inclusive_begin_key", "inclusiveBeginkey") return str(bk or "") def _shard_id(s: Any) -> Optional[int]: sid = _get(s, "shardID", "shardId", "shard_id", "shard") if sid is None: return None try: return int(sid) except Exception: return None def find_adjacent_readwrite_pair(sorted_shards: List[Any]) -> Optional[int]: """ 在“按 inclusiveBeginKey 排序后的 shard 列表”里找相邻的 readwrite+readwrite, 返回第一块 shard 的 shardId(MergeShard 要求传这个)。 """ for i in range(len(sorted_shards) - 1): s1 = sorted_shards[i] s2 = sorted_shards[i + 1] st1 = _get(s1, "status") st2 = _get(s2, "status") if isinstance(st1, str) and isinstance(st2, str) and st1.lower() == "readwrite" and st2.lower() == "readwrite": sid1 = _shard_id(s1) if sid1 is not None: return sid1 return None def merge_once(client: Sls20201230Client, project: str, logstore: str, shard_id_first: int) -> Any: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} return client.merge_shard_with_options(project, logstore, shard_id_first, headers, runtime) def merge_logstore_to_one_rw( client: Sls20201230Client, project: str, logstore: str, target_rw: int = 1, max_rounds: int = 200, sleep_sec: float = 0.3, verbose: bool = True, ) -> Tuple[int, int]: """ 返回 (initial_rw, final_rw) """ shards = list_shards(client, project, logstore) initial_rw = count_readwrite_shards(shards) if initial_rw <= target_rw: return initial_rw, initial_rw rounds = 0 while rounds < max_rounds: shards = list_shards(client, project, logstore) # 只看 readwrite 数量决定是否继续 rw_cnt = count_readwrite_shards(shards) if rw_cnt <= target_rw: return initial_rw, rw_cnt # 必须在“全量 shard(含 readonly)”里按 beginKey 排序,再找相邻 readwrite 对 sorted_all = sorted(shards, key=_begin_key) first_id = find_adjacent_readwrite_pair(sorted_all) if first_id is None: # 找不到相邻 readwrite 对:可能暂时处于 split/merge 过程,或状态异常 if verbose: print(f"[WARN] {logstore}: 找不到相邻 readwrite shard 对,无法继续 merge(当前 readwrite={rw_cnt})", file=sys.stderr) return initial_rw, rw_cnt if verbose: print(f"[INFO] {logstore}: merging shard {first_id} with its adjacent readwrite shard... (readwrite={rw_cnt})") merge_once(client, project, logstore, first_id) rounds += 1 if sleep_sec > 0: time.sleep(sleep_sec) # 达到最大轮次仍未完成 shards = list_shards(client, project, logstore) return initial_rw, count_readwrite_shards(shards) def main(argv: List[str]) -> None: parser = argparse.ArgumentParser( description="Merge all logstores in a project so that READWRITE shards become 1 (only for those with readwrite_shards > 1)." ) parser.add_argument("project", help="SLS project name") parser.add_argument("--endpoint", default="ap-southeast-1.log.aliyuncs.com", help="SLS endpoint") parser.add_argument("--min-rw", type=int, default=2, help="Only process logstores with readwrite_shards >= min-rw (default: 2)") parser.add_argument("--target-rw", type=int, default=1, help="Target readwrite shard count (default: 1)") parser.add_argument("--max-rounds", type=int, default=200, help="Max merge rounds per logstore") parser.add_argument("--sleep", type=float, default=0.3, help="Sleep seconds between merges (default: 0.3)") parser.add_argument("--json", action="store_true", help="Output summary as JSON") parser.add_argument("--quiet", action="store_true", help="Less logs") args = parser.parse_args(argv) client = create_client(args.endpoint) logstores = list_all_logstores(client, args.project) summary: List[Dict[str, Any]] = [] for ls in logstores: try: shards = list_shards(client, args.project, ls) rw_cnt = count_readwrite_shards(shards) if rw_cnt < args.min_rw: continue init_rw, final_rw = merge_logstore_to_one_rw( client=client, project=args.project, logstore=ls, target_rw=args.target_rw, max_rounds=args.max_rounds, sleep_sec=args.sleep, verbose=(not args.quiet), ) summary.append({"logstore": ls, "initial_readwrite_shards": init_rw, "final_readwrite_shards": final_rw}) except Exception as e: msg = getattr(e, "message", None) or str(e) recommend = "" data = getattr(e, "data", None) if isinstance(data, dict): recommend = str(data.get("Recommend") or "") print(f"[ERROR] {ls}: {msg}", file=sys.stderr) if recommend: print(f"[ERROR] Recommend: {recommend}", file=sys.stderr) if args.json: print(json.dumps(summary, ensure_ascii=False, indent=2)) else: # TSV: logstore initial_rw final_rw for item in summary: print(f"{item['logstore']}\t{item['initial_readwrite_shards']}\t{item['final_readwrite_shards']}") if __name__ == "__main__": main(sys.argv[1:])
bashpython merge_project_logstores_to_1_rw_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com
输出(TSV)示例:
textanalysis-service 4 1 xxx-logstore 2 1
bashpython merge_project_logstores_to_1_rw_shard.py k8s-log-xxx --endpoint ap-southeast-1.log.aliyuncs.com --json
bashpython merge_project_logstores_to_1_rw_shard.py k8s-log-xxx --endpoint ap-southeast-1.log.aliyuncs.com --sleep 1
如果你运行时遇到 [WARN] 找不到相邻 readwrite shard 对,把该 logstore 的 ListShards 输出贴我一下(或告诉我 shard 的 status 分布),我可以帮你把策略改成更“强力”的:比如先等待状态稳定/重试、或针对正在 merge/split 的情况做更稳的轮询逻辑。
(chohotech) ananas@su:~/code/合并shard$ python merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com
[INFO] api-knowledgebase: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[INFO] api-knowledgebase: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[ERROR] api-knowledgebase: shard not exist or readonly
[INFO] api-knowledgebase-en: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[INFO] api-knowledgebase-en: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[ERROR] api-knowledgebase-en: shard not exist or readonly
[INFO] audit-cf5fd9866be3d44f7bc86aca1950ca478: merging shard 32 with its adjacent readwrite shard... (readwrite=31)
[INFO] audit-cf5fd9866be3d44f7bc86aca1950ca478: merging shard 32 with its adjacent readwrite shard... (readwrite=31)
[ERROR] audit-cf5fd9866be3d44f7bc86aca1950ca478: shard not exist or readonly
[INFO] auth-service: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[INFO] auth-service: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[ERROR] auth-service: shard not exist or readonly
使用你的脚本后出现这个情况
你这个报错本质原因是:MergeShard 触发后,原来参与 merge 的两块 shard 会很快变成 readonly(或被替换掉),而合并生成的新 shard 需要一点时间才稳定到 readwrite。你脚本里在同一个 logstore 上连续发了两次 merge,第二次就会命中 “shard not exist or readonly”。官方说明里也明确:merge 需要“两块相邻且都是 readwrite”,merge 后原 shard 变 readonly。 (AlibabaCloud)
下面是修复后的完整脚本:每次对某个 logstore 发起一次 merge 后,会轮询等待直到 readwrite shard 数量确实减少(或达到目标 1)才继续下一次 merge,因此不会再“连打两次”导致你这个错误。
python# -*- coding: utf-8 -*- import sys import json import time import argparse from typing import List, Dict, Any, Optional, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models def create_client(endpoint: str) -> Sls20201230Client: credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) def _get(obj: Any, name: str, *aliases: str) -> Any: """兼容 dict / object 两种返回结构""" keys = (name,) + aliases if isinstance(obj, dict): for k in keys: if k in obj: return obj.get(k) lower_map = {str(k).lower(): k for k in obj.keys()} for k in keys: kk = lower_map.get(str(k).lower()) if kk is not None: return obj.get(kk) return None for k in keys: if hasattr(obj, k): return getattr(obj, k) return None def list_all_logstores(client: Sls20201230Client, project: str, page_size: int = 500) -> List[str]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_ls: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_ls.extend(batch) total = body.total if (body and body.total is not None) else len(all_ls) if not batch or len(all_ls) >= total: break offset += len(batch) return all_ls def list_shards(client: Sls20201230Client, project: str, logstore: str) -> List[Any]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) return resp.body or [] def count_readwrite_shards(shards: List[Any]) -> int: cnt = 0 for s in shards: status = _get(s, "status") if isinstance(status, str) and status.lower() == "readwrite": cnt += 1 return cnt def _begin_key(s: Any) -> str: bk = _get(s, "inclusiveBeginKey", "inclusive_begin_key") return str(bk or "") def _shard_id(s: Any) -> Optional[int]: sid = _get(s, "shardID", "shardId", "shard_id") if sid is None: return None try: return int(sid) except Exception: return None def find_adjacent_readwrite_pair(shards: List[Any]) -> Optional[int]: """ 按 beginKey 排序后,找相邻 readwrite + readwrite,返回“第一块”的 shardId """ sorted_all = sorted(shards, key=_begin_key) for i in range(len(sorted_all) - 1): s1, s2 = sorted_all[i], sorted_all[i + 1] st1 = _get(s1, "status") st2 = _get(s2, "status") if isinstance(st1, str) and isinstance(st2, str) and st1.lower() == "readwrite" and st2.lower() == "readwrite": sid1 = _shard_id(s1) if sid1 is not None: return sid1 return None def merge_once(client: Sls20201230Client, project: str, logstore: str, shard_id_first: int) -> None: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} client.merge_shard_with_options(project, logstore, shard_id_first, headers, runtime) def is_shard_not_exist_or_readonly(err: Exception) -> bool: msg = getattr(err, "message", None) or str(err) msg_l = msg.lower() return ("shard not exist" in msg_l) or ("readonly" in msg_l) def wait_until_rw_decreases( client: Sls20201230Client, project: str, logstore: str, rw_before: int, target_rw: int, timeout_sec: float, poll_start: float = 0.6, poll_max: float = 5.0, ) -> int: """ 等待直到 readwrite shard 数 < rw_before(即本次 merge 生效),或 <= target_rw """ start = time.time() interval = poll_start while True: shards = list_shards(client, project, logstore) rw_now = count_readwrite_shards(shards) if rw_now <= target_rw or rw_now < rw_before: return rw_now if time.time() - start >= timeout_sec: return rw_now time.sleep(interval) interval = min(poll_max, interval * 1.4) def merge_logstore_to_one_rw( client: Sls20201230Client, project: str, logstore: str, target_rw: int, max_rounds: int, wait_timeout_sec: float, quiet: bool, ) -> Tuple[int, int]: shards = list_shards(client, project, logstore) initial_rw = count_readwrite_shards(shards) if initial_rw <= target_rw: return initial_rw, initial_rw rounds = 0 rw_now = initial_rw while rounds < max_rounds and rw_now > target_rw: shards = list_shards(client, project, logstore) rw_now = count_readwrite_shards(shards) if rw_now <= target_rw: break shard_first = find_adjacent_readwrite_pair(shards) if shard_first is None: if not quiet: print(f"[WARN] {logstore}: 找不到相邻 readwrite shard 对(当前 readwrite={rw_now}),停止。", file=sys.stderr) break if not quiet: print(f"[INFO] {logstore}: merging shard {shard_first} ... (readwrite={rw_now})") # 发起 merge(如果返回“not exist or readonly”,多半是刚刚 merge 触发后状态切换,按“等待生效”处理) try: merge_once(client, project, logstore, shard_first) except Exception as e: if is_shard_not_exist_or_readonly(e): if not quiet: print(f"[WARN] {logstore}: merge 返回 '{getattr(e,'message',str(e))}',可能处于状态切换/merge进行中,等待生效…", file=sys.stderr) else: raise # 关键:等待这次 merge 真正让 readwrite 数下降(或达到目标) rw_after = wait_until_rw_decreases( client=client, project=project, logstore=logstore, rw_before=rw_now, target_rw=target_rw, timeout_sec=wait_timeout_sec, ) if not quiet: print(f"[INFO] {logstore}: readwrite {rw_now} -> {rw_after}") # 如果没下降,说明 merge 还没完成或被阻塞,避免死循环:直接退出或你也可改成继续等 if rw_after >= rw_now: if not quiet: print(f"[WARN] {logstore}: 等待超时仍未下降(readwrite={rw_after}),先停止该 logstore。", file=sys.stderr) break rw_now = rw_after rounds += 1 return initial_rw, rw_now def main(argv: List[str]) -> None: parser = argparse.ArgumentParser( description="Merge all logstores (with readwrite shards > 1) to make readwrite shards become 1." ) parser.add_argument("project", help="SLS project name") parser.add_argument("--endpoint", default="ap-southeast-1.log.aliyuncs.com", help="SLS endpoint") parser.add_argument("--min-rw", type=int, default=2, help="Only process logstores with readwrite_shards >= min-rw (default 2)") parser.add_argument("--target-rw", type=int, default=1, help="Target readwrite shard count (default 1)") parser.add_argument("--max-rounds", type=int, default=200, help="Max merge rounds per logstore") parser.add_argument("--wait-timeout", type=float, default=90.0, help="Wait timeout seconds after each merge (default 90s)") parser.add_argument("--json", action="store_true", help="Output summary as JSON") parser.add_argument("--quiet", action="store_true", help="Less logs") args = parser.parse_args(argv) client = create_client(args.endpoint) logstores = list_all_logstores(client, args.project) summary: List[Dict[str, Any]] = [] for ls in logstores: try: shards = list_shards(client, args.project, ls) rw_cnt = count_readwrite_shards(shards) if rw_cnt < args.min_rw: continue init_rw, final_rw = merge_logstore_to_one_rw( client=client, project=args.project, logstore=ls, target_rw=args.target_rw, max_rounds=args.max_rounds, wait_timeout_sec=args.wait_timeout, quiet=args.quiet, ) summary.append({"logstore": ls, "initial_readwrite_shards": init_rw, "final_readwrite_shards": final_rw}) except Exception as e: msg = getattr(e, "message", None) or str(e) recommend = "" data = getattr(e, "data", None) if isinstance(data, dict): recommend = str(data.get("Recommend") or "") print(f"[ERROR] {ls}: {msg}", file=sys.stderr) if recommend: print(f"[ERROR] Recommend: {recommend}", file=sys.stderr) if args.json: print(json.dumps(summary, ensure_ascii=False, indent=2)) else: # TSV: logstore initial_rw final_rw for item in summary: print(f"{item['logstore']}\t{item['initial_readwrite_shards']}\t{item['final_readwrite_shards']}") if __name__ == "__main__": main(sys.argv[1:])
bashpython merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com
如果某些 logstore shard 很多(比如你那个 readwrite=31),建议把等待时间稍微加大一点,避免 merge 还没落稳就进入下一轮:
bashpython merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com --wait-timeout 180
如果你愿意把某个出问题 logstore 的 ListShards 原始输出(json)贴一份,我还能进一步把“找相邻 shard 对”的策略做得更稳(比如跳过正在 merge 的 readonly 链、从头/从尾合并等)。
(chohotech) ananas@su:~/code/合并shard$ python merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com
[INFO] api-knowledgebase: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[INFO] api-knowledgebase: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[ERROR] api-knowledgebase: shard not exist or readonly
[INFO] api-knowledgebase-en: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[INFO] api-knowledgebase-en: merging shard 0 with its adjacent readwrite shard... (readwrite=2)
[ERROR] api-knowledgebase-en: shard not exist or readonly
[INFO] audit-cf5fd9866be3d44f7bc86aca1950ca478: merging shard 32 with its adjacent readwrite shard... (readwrite=31)
[INFO] audit-cf5fd9866be3d44f7bc86aca1950ca478: merging shard 32 with its adjacent readwrite shard... (readwrite=31)
[ERROR] audit-cf5fd9866be3d44f7bc86aca1950ca478: shard not exist or readonly
还是不能用啊
从你贴的日志看,脚本还是在同一个 logstore 上连续打了两次 merge(两条一模一样的 [INFO] ... merging shard ... (readwrite=...) 紧挨着),这说明你现在运行的 merge_shard.py 并不是我后面给你的“等待生效再继续”的版本(那个版本每次 merge 后一定会打印 readwrite X -> Y,不会立刻再来一条相同的 merging)。
下面给你一个更硬的稳健版,特征是:
ListShards,直到:
merge_shard 的 shardId 强制转成字符串(SDK 有些版本对 path 参数更稳)你把下面代码整段覆盖到 merge_shard.py,然后再运行(日志会出现明显的 WAIT ... / rw A -> B)。
python# -*- coding: utf-8 -*- import sys import json import time import argparse from typing import List, Dict, Any, Optional, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models def create_client(endpoint: str) -> Sls20201230Client: credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) def _get(obj: Any, *names: str) -> Any: """兼容 dict / object 两种返回结构""" if isinstance(obj, dict): for n in names: if n in obj: return obj.get(n) lower_map = {str(k).lower(): k for k in obj.keys()} for n in names: k = lower_map.get(str(n).lower()) if k is not None: return obj.get(k) return None for n in names: if hasattr(obj, n): return getattr(obj, n) return None def list_all_logstores(client: Sls20201230Client, project: str, page_size: int = 500) -> List[str]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_ls: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_ls.extend(batch) total = body.total if (body and body.total is not None) else len(all_ls) if not batch or len(all_ls) >= total: break offset += len(batch) return all_ls def list_shards(client: Sls20201230Client, project: str, logstore: str) -> List[Any]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) return resp.body or [] def shard_id(s: Any) -> Optional[int]: sid = _get(s, "shardID", "shardId", "shard_id") if sid is None: return None try: return int(sid) except Exception: return None def shard_status(s: Any) -> str: st = _get(s, "status") return (st or "").lower() if isinstance(st, str) else "" def begin_key(s: Any) -> str: bk = _get(s, "inclusiveBeginKey", "inclusive_begin_key") return str(bk or "") def count_readwrite(shards: List[Any]) -> int: return sum(1 for s in shards if shard_status(s) == "readwrite") def find_adjacent_readwrite_pair(shards: List[Any]) -> Optional[int]: """ 找相邻 readwrite+readwrite,返回第一块 shardId """ ss = sorted(shards, key=begin_key) for i in range(len(ss) - 1): if shard_status(ss[i]) == "readwrite" and shard_status(ss[i + 1]) == "readwrite": sid = shard_id(ss[i]) if sid is not None: return sid return None def merge_once(client: Sls20201230Client, project: str, logstore: str, first_shard_id: int) -> None: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} # 注意:path 参数用 str 更稳 client.merge_shard_with_options(project, logstore, str(first_shard_id), headers, runtime) def is_not_exist_or_readonly(err: Exception) -> bool: msg = getattr(err, "message", None) or str(err) m = msg.lower() return ("shard not exist" in m) or ("readonly" in m) def wait_merge_effect( client: Sls20201230Client, project: str, logstore: str, rw_before: int, watched_shard_id: int, timeout_sec: float = 180.0, poll_sec: float = 1.0, quiet: bool = False, ) -> int: """ 等待 merge 生效: - readwrite 数减少;或 - watched_shard_id 变 readonly/消失(说明 merge 推进中) """ start = time.time() while True: shards = list_shards(client, project, logstore) rw_now = count_readwrite(shards) # 条件1:rw 数减少(最直观) if rw_now < rw_before: return rw_now # 条件2:本次 merge 的 shard 已经不是 readwrite(readonly/消失) st_map: Dict[int, str] = {} for s in shards: sid = shard_id(s) if sid is not None: st_map[sid] = shard_status(s) watched_status = st_map.get(watched_shard_id, "missing") if watched_status != "readwrite": # 等一小会儿让新 shard 变成 readwrite,再返回最新 rw 数 time.sleep(0.8) shards2 = list_shards(client, project, logstore) return count_readwrite(shards2) if time.time() - start >= timeout_sec: if not quiet: print(f"[WARN] {logstore}: WAIT 超时({timeout_sec}s),rw 仍为 {rw_now}", file=sys.stderr) return rw_now if not quiet: elapsed = int(time.time() - start) print(f"[WAIT] {logstore}: rw={rw_now} watched_shard={watched_shard_id} still readwrite... ({elapsed}s)") time.sleep(poll_sec) def merge_logstore_to_one( client: Sls20201230Client, project: str, logstore: str, target_rw: int = 1, max_rounds: int = 500, wait_timeout: float = 180.0, quiet: bool = False, ) -> Tuple[int, int]: shards = list_shards(client, project, logstore) init_rw = count_readwrite(shards) rw_now = init_rw if rw_now <= target_rw: return init_rw, rw_now rounds = 0 while rounds < max_rounds: shards = list_shards(client, project, logstore) rw_now = count_readwrite(shards) if rw_now <= target_rw: break first_id = find_adjacent_readwrite_pair(shards) if first_id is None: if not quiet: print(f"[WARN] {logstore}: 找不到相邻 readwrite 对(rw={rw_now}),停止", file=sys.stderr) break if not quiet: print(f"[INFO] {logstore}: MERGE round={rounds+1} choose_first_shard={first_id} (rw={rw_now})") try: merge_once(client, project, logstore, first_id) except Exception as e: # 这里如果提示 readonly/not exist,通常是状态切换中;进入等待逻辑,不要立刻再 merge if is_not_exist_or_readonly(e): if not quiet: print(f"[WARN] {logstore}: merge 返回 '{getattr(e,'message',str(e))}',进入等待", file=sys.stderr) else: raise rw_after = wait_merge_effect( client=client, project=project, logstore=logstore, rw_before=rw_now, watched_shard_id=first_id, timeout_sec=wait_timeout, poll_sec=1.0, quiet=quiet, ) if not quiet: print(f"[INFO] {logstore}: rw {rw_now} -> {rw_after}") # 如果没变化,别死循环狂打 merge if rw_after >= rw_now: if not quiet: print(f"[WARN] {logstore}: rw 未下降(rw={rw_after}),先停止该 logstore", file=sys.stderr) break rw_now = rw_after rounds += 1 return init_rw, rw_now def main(argv: List[str]) -> None: parser = argparse.ArgumentParser(description="Merge all logstores in a project to make READWRITE shards become 1.") parser.add_argument("project", help="SLS project name") parser.add_argument("--endpoint", default="ap-southeast-1.log.aliyuncs.com", help="SLS endpoint") parser.add_argument("--min-rw", type=int, default=2, help="only process logstores with readwrite >= min-rw (default 2)") parser.add_argument("--target-rw", type=int, default=1, help="target readwrite shard count (default 1)") parser.add_argument("--wait-timeout", type=float, default=180.0, help="wait seconds after each merge (default 180)") parser.add_argument("--json", action="store_true", help="output summary json") parser.add_argument("--quiet", action="store_true", help="less logs") args = parser.parse_args(argv) client = create_client(args.endpoint) logstores = list_all_logstores(client, args.project) summary: List[Dict[str, Any]] = [] for ls in logstores: try: shards = list_shards(client, args.project, ls) rw = count_readwrite(shards) if rw < args.min_rw: continue if not args.quiet: print(f"\n=== START {ls} (readwrite={rw}) ===") init_rw, final_rw = merge_logstore_to_one( client=client, project=args.project, logstore=ls, target_rw=args.target_rw, wait_timeout=args.wait_timeout, quiet=args.quiet, ) summary.append({"logstore": ls, "initial_readwrite": init_rw, "final_readwrite": final_rw}) except Exception as e: msg = getattr(e, "message", None) or str(e) recommend = "" data = getattr(e, "data", None) if isinstance(data, dict): recommend = str(data.get("Recommend") or "") print(f"[ERROR] {ls}: {msg}", file=sys.stderr) if recommend: print(f"[ERROR] Recommend: {recommend}", file=sys.stderr) if args.json: print(json.dumps(summary, ensure_ascii=False, indent=2)) else: for it in summary: print(f"{it['logstore']}\t{it['initial_readwrite']}\t{it['final_readwrite']}") if __name__ == "__main__": main(sys.argv[1:])
bashpython merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com
建议你这个项目里 shard 很多(比如 rw=31),把等待超时再拉大一点:
bashpython merge_shard.py k8s-log-cf5fd9866be3d44f7bc86aca1950ca478 --endpoint ap-southeast-1.log.aliyuncs.com --wait-timeout 300
因为你前一个脚本在 merge 后立刻又 merge 同一个 shardId,而该 shardId 此时已经变 readonly 或被替换掉了,所以必然报这个错。这个版本通过 WAIT 强制等待状态推进,不会再“连打”。
如果你跑这版后,某个 logstore 一直 WAIT 超时,把那一个 logstore 的 ListShards 输出贴一段(只要包含 shardId/status/beginKey),我可以帮你判断是不是有 非相邻 readwrite、或者长期处于 split/merge 中间态 导致无法继续合并。
(chohotech) ananas@su:~/code/合并shard$ python merge_shard.py k8s-log-c83e0b203ed244f10a617abfb42993404 --endpoint cn-hangzhou.log.aliyuncs.com --wait-timeout=5
=== START alb-c83e0b203ed244f10a617abfb42993404 (readwrite=2) ===
[INFO] alb-c83e0b203ed244f10a617abfb42993404: MERGE round=1 choose_first_shard=0 (rw=2)
[WAIT] alb-c83e0b203ed244f10a617abfb42993404: rw=2 watched_shard=0 still readwrite... (0s)
[WAIT] alb-c83e0b203ed244f10a617abfb42993404: rw=2 watched_shard=0 still readwrite... (1s)
[WAIT] alb-c83e0b203ed244f10a617abfb42993404: rw=2 watched_shard=0 still readwrite... (2s)
[WAIT] alb-c83e0b203ed244f10a617abfb42993404: rw=2 watched_shard=0 still readwrite... (3s)
[WAIT] alb-c83e0b203ed244f10a617abfb42993404: rw=2 watched_shard=0 still readwrite... (4s)
[WARN] alb-c83e0b203ed244f10a617abfb42993404: WAIT 超时(5.0s),rw 仍为 2
[INFO] alb-c83e0b203ed244f10a617abfb42993404: rw 2 -> 2
[WARN] alb-c83e0b203ed244f10a617abfb42993404: rw 未下降(rw=2),先停止该 logstore
我想直接合并shard,不需要让logstore先停止
你这个现象其实说明:MergeShard 是异步的,接口返回不代表立刻把 shard 状态改掉;在你设置 --wait-timeout=5 的情况下,5 秒内 shard 还没从 readwrite 变成 readonly/消失很正常,所以脚本就超时退出了。
关键点:合并 shard 不需要停 Logstore / 不需要停止写入。但你必须给它足够的时间让 merge 完成(常见几十秒到几分钟,shard 多/写入忙会更久)。你现在等 5 秒当然会失败。
你要“直接合并、不等稳定”的效果,就是:
shard not exist or readonly(因为 shard 状态切换中,下一次 merge 选到的 shard 已经 readonly 或被替换)下面我给你一个“更符合你需求”的版本:
python# -*- coding: utf-8 -*- import sys import json import time import argparse from typing import List, Dict, Any, Optional, Tuple from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models def create_client(endpoint: str) -> Sls20201230Client: credential = CredentialClient() config = open_api_models.Config(credential=credential) config.endpoint = endpoint return Sls20201230Client(config) def _get(obj: Any, *names: str) -> Any: if isinstance(obj, dict): for n in names: if n in obj: return obj.get(n) lower_map = {str(k).lower(): k for k in obj.keys()} for n in names: k = lower_map.get(str(n).lower()) if k is not None: return obj.get(k) return None for n in names: if hasattr(obj, n): return getattr(obj, n) return None def list_all_logstores(client: Sls20201230Client, project: str, page_size: int = 500) -> List[str]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} offset = 0 all_ls: List[str] = [] while True: req = models.ListLogStoresRequest(offset=offset, size=page_size) resp = client.list_log_stores_with_options(project, req, headers, runtime) body = resp.body batch = (body.logstores or []) if body else [] all_ls.extend(batch) total = body.total if (body and body.total is not None) else len(all_ls) if not batch or len(all_ls) >= total: break offset += len(batch) return all_ls def list_shards(client: Sls20201230Client, project: str, logstore: str) -> List[Any]: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} resp = client.list_shards_with_options(project, logstore, headers, runtime) return resp.body or [] def shard_id(s: Any) -> Optional[int]: sid = _get(s, "shardID", "shardId", "shard_id") if sid is None: return None try: return int(sid) except Exception: return None def shard_status(s: Any) -> str: st = _get(s, "status") return (st or "").lower() if isinstance(st, str) else "" def begin_key(s: Any) -> str: bk = _get(s, "inclusiveBeginKey", "inclusive_begin_key") return str(bk or "") def count_readwrite(shards: List[Any]) -> int: return sum(1 for s in shards if shard_status(s) == "readwrite") def find_adjacent_readwrite_pair(shards: List[Any]) -> Optional[int]: ss = sorted(shards, key=begin_key) for i in range(len(ss) - 1): if shard_status(ss[i]) == "readwrite" and shard_status(ss[i + 1]) == "readwrite": sid = shard_id(ss[i]) if sid is not None: return sid return None def merge_once(client: Sls20201230Client, project: str, logstore: str, first_shard_id: int) -> None: runtime = util_models.RuntimeOptions() headers: Dict[str, str] = {} client.merge_shard_with_options(project, logstore, str(first_shard_id), headers, runtime) def is_not_exist_or_readonly(err: Exception) -> bool: msg = getattr(err, "message", None) or str(err) m = msg.lower() return ("shard not exist" in m) or ("readonly" in m) def wait_merge_started_or_effect( client: Sls20201230Client, project: str, logstore: str, rw_before: int, watched_shard: int, timeout_sec: float, poll_sec: float = 1.0, quiet: bool = False, ) -> int: """ 不要求“完全完成”,只要满足其一就继续: 1) rw 数下降 2) watched shard 不再是 readwrite(变 readonly 或消失) => 说明 merge 已开始推进 """ start = time.time() while True: shards = list_shards(client, project, logstore) rw_now = count_readwrite(shards) if rw_now < rw_before: return rw_now st_map: Dict[int, str] = {} for s in shards: sid = shard_id(s) if sid is not None: st_map[sid] = shard_status(s) watched_status = st_map.get(watched_shard, "missing") if watched_status != "readwrite": # merge 已经开始推进了,直接返回当前 rw(下一轮会重新选 shard) return rw_now if time.time() - start >= timeout_sec: if not quiet: print(f"[WARN] {logstore}: wait {timeout_sec}s still no progress (rw={rw_now}, watched={watched_shard} still readwrite)", file=sys.stderr) return rw_now if not quiet: elapsed = int(time.time() - start) print(f"[WAIT] {logstore}: rw={rw_now} watched={watched_shard} still readwrite... ({elapsed}s)") time.sleep(poll_sec) def merge_logstore_to_one( client: Sls20201230Client, project: str, logstore: str, target_rw: int, max_rounds: int, wait_timeout: float, quiet: bool, ) -> Tuple[int, int]: shards = list_shards(client, project, logstore) init_rw = count_readwrite(shards) rw_now = init_rw if rw_now <= target_rw: return init_rw, rw_now rounds = 0 while rounds < max_rounds: shards = list_shards(client, project, logstore) rw_now = count_readwrite(shards) if rw_now <= target_rw: break first_id = find_adjacent_readwrite_pair(shards) if first_id is None: if not quiet: print(f"[WARN] {logstore}: no adjacent readwrite pair (rw={rw_now}), stop", file=sys.stderr) break if not quiet: print(f"[INFO] {logstore}: MERGE round={rounds+1} first_shard={first_id} (rw={rw_now})") try: merge_once(client, project, logstore, first_id) except Exception as e: # 不停止写入时,这类错误经常是“状态切换中/并发变更”,这里直接等待再选 shard if not is_not_exist_or_readonly(e): raise if not quiet: print(f"[WARN] {logstore}: merge got '{getattr(e,'message',str(e))}', will wait and retry", file=sys.stderr) rw_after = wait_merge_started_or_effect( client=client, project=project, logstore=logstore, rw_before=rw_now, watched_shard=first_id, timeout_sec=wait_timeout, poll_sec=1.0, quiet=quiet, ) if not quiet: print(f"[INFO] {logstore}: rw now {rw_after} (before was {rw_now})") # 注意:这里不要求 rw 立刻下降,只要 watched shard 已动了也继续下一轮 rounds += 1 shards = list_shards(client, project, logstore) return init_rw, count_readwrite(shards) def main(argv: List[str]) -> None: p = argparse.ArgumentParser(description="Merge logstores to make READWRITE shards become 1 (no need to stop ingestion).") p.add_argument("project") p.add_argument("--endpoint", default="cn-hangzhou.log.aliyuncs.com") p.add_argument("--min-rw", type=int, default=2) p.add_argument("--target-rw", type=int, default=1) p.add_argument("--wait-timeout", type=float, default=180.0, help="per-merge wait, default 180s (5s is too small)") p.add_argument("--max-rounds", type=int, default=500) p.add_argument("--quiet", action="store_true") p.add_argument("--json", action="store_true") args = p.parse_args(argv) client = create_client(args.endpoint) logstores = list_all_logstores(client, args.project) summary: List[Dict[str, Any]] = [] for ls in logstores: shards = list_shards(client, args.project, ls) rw = count_readwrite(shards) if rw < args.min_rw: continue if not args.quiet: print(f"\n=== START {ls} (readwrite={rw}) ===") init_rw, final_rw = merge_logstore_to_one( client=client, project=args.project, logstore=ls, target_rw=args.target_rw, max_rounds=args.max_rounds, wait_timeout=args.wait_timeout, quiet=args.quiet, ) summary.append({"logstore": ls, "initial_readwrite": init_rw, "final_readwrite": final_rw}) if args.json: print(json.dumps(summary, ensure_ascii=False, indent=2)) else: for it in summary: print(f"{it['logstore']}\t{it['initial_readwrite']}\t{it['final_readwrite']}") if __name__ == "__main__": main(sys.argv[1:])
你刚才 --wait-timeout=5 真的太小了,建议至少 60~300 秒:
bashpython merge_shard.py k8s-log-c83e0b203ed244f10a617abfb42993404 \ --endpoint cn-hangzhou.log.aliyuncs.com \ --wait-timeout 180
如果 shard 很多、写入很忙(比如 audit 那种 rw=31),用 300 更稳:
bashpython merge_shard.py k8s-log-xxx --endpoint ap-southeast-1.log.aliyuncs.com --wait-timeout 300
如果你希望我再帮你加一个“只对某几个 logstore 合并”(白名单/正则匹配),或者加“并发但限速”的版本,也可以直接说你想怎么筛选。