帮我写一个查询的工具
創建於:2025年12月21日
使用 GPT-5.2 Thinking 回答 Chat01
創建於:2025年12月21日
使用 GPT-5.2 Thinking 回答 Chat01
帮我写一个查询的工具
下面给你一个“通用查询工具”(命令行 CLI)的完整可用模板:支持
requests)你把下面代码保存为 qtool.py 就能直接用。
python#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ qtool.py - 一个简单实用的“查询工具”CLI 功能: 1) sql: 执行 SQLite SQL 查询 2) api: 发起 HTTP GET 请求并输出 JSON 输出: - 终端打印(表格/JSON) - 可导出为 JSON 或 CSV 用法示例: # SQLite 查询 python qtool.py sql --db ./data.db --query "select * from users limit 5" python qtool.py sql --db ./data.db --query-file ./query.sql --out users.csv python qtool.py sql --db ./data.db --query "select * from users where id=:id" --bind id=3 --out result.json # API 查询 python qtool.py api --url "https://httpbin.org/get" --param q=hello --param limit=10 python qtool.py api --url "https://api.example.com/items" --header "Authorization=Bearer XXX" --out out.json """ from __future__ import annotations import argparse import csv import json import os import sys import textwrap import sqlite3 from typing import Any, Dict, List, Tuple, Optional # ---------------------------- # 通用工具函数 # ---------------------------- def eprint(*args: Any) -> None: print(*args, file=sys.stderr) def read_text_file(path: str) -> str: with open(path, "r", encoding="utf-8") as f: return f.read() def parse_kv_list(items: Optional[List[str]]) -> Dict[str, str]: """ 解析形如 ["a=1","b=2"] -> {"a":"1","b":"2"} """ result: Dict[str, str] = {} if not items: return result for it in items: if "=" not in it: raise ValueError(f"参数必须是 key=value 格式:{it}") k, v = it.split("=", 1) k = k.strip() v = v.strip() if not k: raise ValueError(f"key 不能为空:{it}") result[k] = v return result def ensure_parent_dir(path: str) -> None: parent = os.path.dirname(os.path.abspath(path)) if parent and not os.path.exists(parent): os.makedirs(parent, exist_ok=True) def guess_out_format(path: str) -> str: ext = os.path.splitext(path)[1].lower() if ext in (".json",): return "json" if ext in (".csv",): return "csv" # 默认 json return "json" def print_table(rows: List[Dict[str, Any]], max_width: int = 32) -> None: """ 简单表格打印(不依赖第三方库) """ if not rows: print("(no rows)") return cols = list(rows[0].keys()) # 计算列宽 widths = {c: min(max(len(str(c)), *(len(str(r.get(c, ""))) for r in rows)), max_width) for c in cols} def fmt_cell(val: Any, w: int) -> str: s = "" if val is None else str(val) if len(s) > w: s = s[: w - 1] + "…" return s.ljust(w) header = " | ".join(fmt_cell(c, widths[c]) for c in cols) sep = "-+-".join("-" * widths[c] for c in cols) print(header) print(sep) for r in rows: line = " | ".join(fmt_cell(r.get(c, ""), widths[c]) for c in cols) print(line) # ---------------------------- # SQLite 查询 # ---------------------------- def run_sqlite_query(db_path: str, query: str, binds: Dict[str, str]) -> List[Dict[str, Any]]: if not os.path.exists(db_path): raise FileNotFoundError(f"SQLite 数据库不存在:{db_path}") # sqlite3 会自动把 :name 绑定为参数 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() cur.execute(query, binds) # 只取 SELECT 的结果;非 SELECT 会返回空 rows fetched = cur.fetchall() if cur.description else [] # 若是写操作,提交 if cur.description is None: conn.commit() rows: List[Dict[str, Any]] = [] for row in fetched: rows.append({k: row[k] for k in row.keys()}) return rows finally: conn.close() # ---------------------------- # API 查询(GET) # ---------------------------- def run_api_get(url: str, params: Dict[str, str], headers: Dict[str, str], timeout: int) -> Any: try: import requests # type: ignore except ImportError: raise RuntimeError("缺少 requests:请先安装 `pip install requests`") resp = requests.get(url, params=params, headers=headers, timeout=timeout) resp.raise_for_status() ctype = resp.headers.get("Content-Type", "") # 尽量解析 JSON;否则返回文本 if "application/json" in ctype or "json" in ctype: return resp.json() return {"text": resp.text, "content_type": ctype, "status_code": resp.status_code} # ---------------------------- # 输出 # ---------------------------- def write_json(path: str, data: Any) -> None: ensure_parent_dir(path) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def write_csv(path: str, rows: List[Dict[str, Any]]) -> None: ensure_parent_dir(path) if not rows: # 写一个空文件(只包含表头的话也不知道列名) with open(path, "w", encoding="utf-8", newline="") as f: f.write("") return fieldnames = list(rows[0].keys()) with open(path, "w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fieldnames) w.writeheader() for r in rows: w.writerow({k: ("" if r.get(k) is None else r.get(k)) for k in fieldnames}) # ---------------------------- # CLI # ---------------------------- def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="qtool", formatter_class=argparse.RawDescriptionHelpFormatter, description="一个通用查询工具:SQLite SQL 查询 + HTTP API 查询", epilog=textwrap.dedent( """\ 示例: python qtool.py sql --db ./data.db --query "select * from users limit 5" python qtool.py sql --db ./data.db --query "select * from users where id=:id" --bind id=3 --out out.json python qtool.py api --url "https://httpbin.org/get" --param q=hello --param limit=10 """ ), ) sub = p.add_subparsers(dest="cmd", required=True) # sql 子命令 ps = sub.add_parser("sql", help="执行 SQLite SQL 查询") ps.add_argument("--db", required=True, help="SQLite 数据库文件路径,如 ./data.db") g = ps.add_mutually_exclusive_group(required=True) g.add_argument("--query", help="SQL 字符串") g.add_argument("--query-file", help="从文件读取 SQL(UTF-8)") ps.add_argument("--bind", action="append", default=[], help="绑定参数 key=value,可多次提供。例如 --bind id=3") ps.add_argument("--out", help="输出到文件(.json 或 .csv),不填则打印到终端") ps.add_argument("--max-print", type=int, default=200, help="终端最多打印多少行(默认 200)") # api 子命令 pa = sub.add_parser("api", help="发起 HTTP GET 请求查询") pa.add_argument("--url", required=True, help="请求 URL") pa.add_argument("--param", action="append", default=[], help="Query 参数 key=value,可多次提供") pa.add_argument("--header", action="append", default=[], help="Header key=value,可多次提供") pa.add_argument("--timeout", type=int, default=20, help="请求超时秒数(默认 20)") pa.add_argument("--out", help="输出到文件(.json),不填则打印到终端") return p def main(argv: List[str]) -> int: parser = build_parser() args = parser.parse_args(argv) try: if args.cmd == "sql": binds = parse_kv_list(args.bind) query = args.query if args.query else read_text_file(args.query_file) rows = run_sqlite_query(args.db, query, binds) if args.out: fmt = guess_out_format(args.out) if fmt == "csv": write_csv(args.out, rows) else: write_json(args.out, rows) print(f"已写入:{args.out}({len(rows)} rows)") else: # 终端打印(限制行数) to_print = rows[: args.max_print] print_table(to_print) if len(rows) > len(to_print): print(f"... 省略 {len(rows) - len(to_print)} 行(用 --max-print 调大)") return 0 if args.cmd == "api": params = parse_kv_list(args.param) headers = parse_kv_list(args.header) data = run_api_get(args.url, params=params, headers=headers, timeout=args.timeout) if args.out: write_json(args.out, data) print(f"已写入:{args.out}") else: print(json.dumps(data, ensure_ascii=False, indent=2)) return 0 eprint("未知命令") return 2 except Exception as ex: eprint(f"错误:{ex}") return 1 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))
bashpython qtool.py sql --db ./data.db --query "select * from users limit 5" python qtool.py sql --db ./data.db --query "select * from users where id=:id" --bind id=3 python qtool.py sql --db ./data.db --query-file ./query.sql --out result.csv
先装依赖:
bashpip install requests
再查询:
bashpython qtool.py api --url "https://httpbin.org/get" --param q=hello --param limit=10 python qtool.py api --url "https://api.example.com/items" --header "Authorization=Bearer XXX" --out out.json
如果你说的“查询工具”是 查询某个具体系统(比如 MySQL/PostgreSQL、ElasticSearch、日志、内部接口、或某个业务表),我也能把这个模板改成更贴合的版本:把数据源/接口形式告诉我一句(例如“查 MySQL、需要分页、要按时间范围过滤”),我就按你的场景直接改好。