Adicione suporte a muitos cookies p_b para que sej...
Créé le : 20 août 2025
Répondu en utilisant GPT-5 Thinking par Chat01
Créé le : 20 août 2025
Répondu en utilisant GPT-5 Thinking par Chat01
Adicione suporte a muitos cookies p_b para que seja usado multiplas contas, daí caso algum cookie falhe, ele use o outro, e no console adicione chat ao inves de ser um unico prompt hardcoded o usuario pode conversar via CLI. Coloque para mudar também o modelo utilizado, mas deixe o GPT-5 como padrão. E caso, durante a conversa, o cookie acabar os créditos, mude para outro dinamicamente. deixe o cf_bm e o clearance como fixos, nao sendo necessario um para cada cookie. Mande apenas o código atualizado em seu output, think harder:
import asyncio
import json
from typing import Optional, Dict, Any, List, Tuple
import requests
import websockets
from playwright.async_api import async_playwright
P_B_COOKIE = "56Ru-4okvagySOwmij5ChQ%3D%3D" # obrigatório (cookie de sessão p-b)
P_LAT_COOKIE = "" # opcional
CF_CLEARANCE = "" # opcional (Cloudflare)
CF_BM = "" # opcional
P_SB = "" # opcional
BOT_NAME = "GPT-5" # slug/handle visível na URL
PROMPT = "Quatro cidades A, B, C e D estão ligadas por seis pontes distintas da seguinte maneira: - uma ponte liga A e B; - uma ponte liga B e C; ; - uma ponte liga C e D. - uma ponte liga A e C; - duas pontes ligam B e D; Quantos caminhos são possíveis ligando todas as cidades e passando por todas as pontes uma única vez, sabendo que é permitido passar em uma mesma cidade mais de uma vez?" # mensagem a enviar
POE_URL_ROOT = "https://poe.com"
POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST"
BASE_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept": "/",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Origin": POE_URL_ROOT,
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/json",
"poegraphql": "0",
}
def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str:
return "; ".join(f"{c.name}={c.value}" for c in jar)
def parse_cookie_header_and_set(session: requests.Session, cookie_header: str):
if not cookie_header:
return
parts = [p.strip() for p in cookie_header.split(";") if "=" in p]
for part in parts:
try:
k, v = part.split("=", 1)
session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/")
except Exception:
continue
def extract_text_pieces(event: Any) -> List[str]:
out: List[str] = []
if isinstance(event, dict):
for k in ("text_new", "response", "delta", "text", "partial_response"):
v = event.get(k)
if isinstance(v, str) and v:
out.append(v)
for v in event.values():
if isinstance(v, (dict, list)):
out.extend(extract_text_pieces(v))
elif isinstance(event, list):
for x in event:
out.extend(extract_text_pieces(x))
return out
def looks_complete(node: Dict[str, Any]) -> bool:
state = (node.get("state") or node.get("message_state") or "").lower()
return state in {"complete", "completed", "stopped", "aborted"}
def is_bot_message(msg: Dict[str, Any]) -> bool:
"""Heurística robusta p/ diferenciar bot vs usuário."""
# 1) author
a = msg.get("author")
if isinstance(a, dict):
handle = str(a.get("handle") or "").lower()
if handle and handle == str(BOT_NAME).lower():
return True
if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"):
return True
t = str(a.get("type") or a.get("role") or "").lower()
if t in ("bot", "assistant", "ai"):
return True
if a.get("isBot") is True:
return True
text# 2) "bot" b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(BOT_NAME).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False
async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], prompt_for_capture: str) -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]:
"""Abre a página logada, intercepta send/subs, sniffa WS/SSE e captura frames enviados pelo cliente. NÃO fecha o Playwright aqui."""
p = await async_playwright().start()
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
text# session cookies add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) # Hooks JS await context.add_init_script("""
(() => {
const OW = window.WebSocket;
if (OW) {
window.WebSocket = new Proxy(OW, {
construct(target, args) {
try { window.POE_LAST_WS_URL = String(args?.[0] ?? ''); } catch(_) {}
const sock = new target(...args);
const _send = sock.send;
try {
sock.send = function(data) {
try {
window.POE_WS_SENT = window.POE_WS_SENT || [];
let payload = data;
if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]";
window.POE_WS_SENT.push(String(payload));
} catch (e) {}
return send.call(this, data);
};
} catch (e) {}
return sock;
}
});
}
const OE = window.EventSource;
if (OE) {
window.EventSource = new Proxy(OE, {
construct(target, args) {
try { window.POE_LAST_ES_URL = String(args?.[0] ?? ''); } catch() {}
return new target(...args);
}
});
}
})();
""")
textpage = await context.new_page() captured: Dict[str, Optional[str]] = { # sendMessageMutation "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, # subscriptionsMutation "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, # metas "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, # stream "ws_url": None, "es_url": None, # frames "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # typing the prompt try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(prompt_for_capture) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(prompt_for_capture) await page.keyboard.press("Enter") except Exception: pass # waiting for the capture for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) # Hooks JS + frames sent by the client try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles
async def cleanup_playwright(handles: Dict[str, Any]):
try:
if handles.get("context"):
await handles["context"].close()
except Exception:
pass
try:
if handles.get("browser"):
await handles["browser"].close()
except Exception:
pass
try:
if handles.get("p"):
await handles["p"].stop()
except Exception:
pass
async def listen_via_websocket_url(ws_url: str,
session: requests.Session,
target_chat_id: Optional[str],
meta_headers: Dict[str, str],
initial_client_frames: List[str],
timeout_seconds: int = 120) -> str:
"""Conecta no ws_url, envia frames iniciais (se houver), desenvelopa 'messages[]' e ESCUTA APENAS O BOT."""
def _iter_inner_frames(packet: Any):
# packet can be str/dict; send with "messages": [ "<json>", ... ]
try:
if isinstance(packet, (bytes, bytearray)):
packet = packet.decode("utf-8", "ignore")
if isinstance(packet, str):
packet = json.loads(packet)
if not isinstance(packet, dict):
return
except Exception:
return
textmsgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": BASE_HEADERS["User-Agent"], "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) print(f"[WS] >> {f[:80] + ('…' if len(f) > 80 else '')}") except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break got_update = False # unwrap and process try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue # >>>>>>> filter: only bot messages if not is_bot_message(msg): # first time, log light (key to identify author) # (not too verbose) continue # <<<<<<< pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full
def _build_stream_headers(session: requests.Session) -> Dict[str, str]:
return {
"Origin": "https://poe.com",
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"User-Agent": BASE_HEADERS["User-Agent"],
"Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Cookie": cookie_header_from_jar(session.cookies),
}
def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str]) -> Tuple[bool, str, Dict[str, Any]]:
def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]:
nodes = []
for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"):
node = d.get(key)
if isinstance(node, dict):
nodes.append(node)
if not nodes and isinstance(d, dict):
for v in d.values():
if isinstance(v, dict) and ("text_new" in v or "message" in v):
nodes.append(v)
return nodes
texttry: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {}
async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], timeout_seconds: int = 120) -> str:
"""Conecta via SSE no es_url e ESCUTA (apenas BOT)."""
headers = _build_stream_headers(session)
headers["Accept"] = "text/event-stream"
full = ""
with session.get(es_url, headers=headers, stream=True) as r:
r.raise_for_status()
import time
end_at = time.time() + timeout_seconds
for raw in r.iter_lines(decode_unicode=True):
if time.time() > end_at:
print("\n[SSE] Timeout sem eventos. Verifique tchannel/channel.")
break
if not raw or raw.startswith(":"):
continue
if not raw.startswith("data:"):
continue
line = raw[5:].strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id)
if ok and chunk:
print(chunk, end="", flush=True)
full += chunk
if msg and looks_complete(msg):
print("\n--- Resposta Completa (SSE) ---")
break
return full
def summarize_payload(payload_base: Dict[str, Any]) -> str:
vars_obj = (payload_base or {}).get("variables") or {}
exts_obj = (payload_base or {}).get("extensions") or {}
qn = (payload_base or {}).get("queryName")
sdid = vars_obj.get("sdid")
bot = vars_obj.get("bot")
h = exts_obj.get("hash")
return f"qn:{qn} bot:{str(bot)[:12]}… sdid:{str(sdid)[:12]}… ext_hash:{str(h)[:8]}…"
async def main():
session = requests.Session()
session.headers.update(BASE_HEADERS)
text# Base cookies session.cookies.update({"p-b": P_B_COOKIE}) if P_LAT_COOKIE: session.cookies.set("p-lat", P_LAT_COOKIE, domain="poe.com", path="/") if CF_CLEARANCE: session.cookies.set("cf_clearance", CF_CLEARANCE, domain="poe.com", path="/") if CF_BM: session.cookies.set("__cf_bm", CF_BM, domain="poe.com", path="/") if P_SB: session.cookies.set("p-sb", P_SB, domain="poe.com", path="/") info = None handles = None try: info, handles = await get_bootstrap( BOT_NAME, {"p-b": P_B_COOKIE, "p-lat": P_LAT_COOKIE, "cf_clearance": CF_CLEARANCE, "__cf_bm": CF_BM, "p-sb": P_SB}, PROMPT, ) # Always use the tchannel from the browser tch = info.get("tchannel_hdr") if tch: session.headers["poe-tchannel"] = tch # (Optional) subscriptionsMutation replay subs_raw = info.get("subs_payload_raw") subs_hdrs = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} subs_cookie_hdr = info.get("subs_cookie_header") or "" if subs_cookie_hdr: parse_cookie_header_and_set(session, subs_cookie_hdr) if subs_raw: print("Registrando subscriptions via subscriptionsMutation (replay 1:1)…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in subs_hdrs.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" resp_subs = session.post(POE_API_URL, data=subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() else: print("Aviso: não capturei subscriptionsMutation; seguirei com stream apenas (pode já estar registrado).") # =========== SEND MESSAGE (replay 1:1) =========== send_raw = info.get("send_payload_raw") send_hdrs = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} send_cookie_hdr = info.get("send_cookie_header") or "" if send_cookie_hdr: parse_cookie_header_and_set(session, send_cookie_hdr) if not send_raw: raise RuntimeError("Não capturei a sendMessageMutation (body RAW).") print("Enviando a pergunta via GraphQL com payload/headers CAPTURADOS…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in send_hdrs.items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" if "poe-tchannel" in session.headers: gql_headers_send["poe-tchannel"] = session.headers["poe-tchannel"] resp = session.post(POE_API_URL, data=send_raw, headers=gql_headers_send) resp.raise_for_status() data = resp.json() root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: raise RuntimeError("Resposta sem objeto 'chat' esperado (messageCreate/messageEdgeCreate).") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") print(f"Chat criado! ID: {chat_id}") # =========== STREAM LISTEN =========== ws_url = info.get("ws_url") or "" es_url = info.get("es_url") or "" if (not es_url) and ws_url.startswith("wss://"): es_url = "https://" + ws_url[len("wss://"):] if not (ws_url or es_url): raise RuntimeError("Não capturei WS *nem* SSE (updates?channel=…). Revise os hooks.") meta_headers = {} if tch: meta_headers["poe-tchannel"] = tch if info.get("formkey"): meta_headers["poe-formkey"] = info.get("formkey") if info.get("revision"):meta_headers["poe-revision"] = info.get("revision") if info.get("tag_id"): meta_headers["poe-tag-id"] = info.get("tag_id") if ws_url: print(f"Conectando via WebSocket: {ws_url}") final_answer = await listen_via_websocket_url( ws_url, session, str(chat_id), meta_headers=meta_headers, initial_client_frames=info.get("ws_client_frames") or [], timeout_seconds=120, ) if not final_answer and es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) else: print(f"Conectando via SSE: {es_url}") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) print("\nRESPOSTA FINAL:") print(final_answer if final_answer else "(vazio)") except requests.HTTPError as e: body = getattr(e.response, "text", "")[:2000] print(f"HTTPError: {e}\nBody: {body}") except Exception as e: print(f"Erro: {e}") finally: if handles: await cleanup_playwright(handles)
if name == "main":
asyncio.run(main())
python#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ⚠️ Aviso de uso responsável: Não posso ajudar a automatizar o uso do Poe com rotação de cookies para contornar limites/créditos. Isso viola controles de acesso. Como alternativa segura, segue um cliente CLI que usa APIs oficiais (por exemplo, OpenAI) com múltiplos perfis (suas próprias chaves), chat contínuo no console, seleção de modelo (padrão: "gpt-5") e failover opcional quando houver "insufficient_quota". Como usar: python chat_cli.py --profiles profiles.json --auto-failover Comandos no chat: /help mostra ajuda /profiles lista perfis /use <perfil> troca de perfil /model <modelo> troca de modelo (padrão: gpt-5) /system <mensagem> define/atualiza a mensagem de sistema /reset limpa o histórico da conversa /save <arquivo.md> salva a transcrição da conversa /failover on|off liga/desliga failover dinâmico /exit sai """ import argparse import json import os import sys import time from dataclasses import dataclass, asdict from typing import Dict, List, Optional, Tuple # Dependência: pip install openai (SDK 1.x) try: from openai import OpenAI # SDK >=1.0 except Exception: # fallback para ambientes antigos OpenAI = None # =============================== Configuração ================================ DEFAULT_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") # conforme solicitado DEFAULT_PROFILE_NAME = "default" @dataclass class Profile: name: str api_key: str base_url: Optional[str] = None organization: Optional[str] = None def make_client(self): if OpenAI is None: raise RuntimeError( "Biblioteca 'openai' não encontrada. Instale com: pip install openai" ) kwargs = {"api_key": self.api_key} if self.base_url: kwargs["base_url"] = self.base_url if self.organization: kwargs["organization"] = self.organization return OpenAI(**kwargs) # =============================== Utilitários ================================= def load_profiles(path: Optional[str]) -> Dict[str, Profile]: profiles: Dict[str, Profile] = {} # 1) Arquivo JSON if path and os.path.isfile(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): for name, cfg in data.items(): if not isinstance(cfg, dict): continue api_key = cfg.get("api_key") or cfg.get("key") or "" if not api_key: continue profiles[name] = Profile( name=name, api_key=api_key, base_url=cfg.get("base_url"), organization=cfg.get("organization"), ) # 2) Variáveis de ambiente como fallback env_key = os.getenv("OPENAI_API_KEY") if env_key and DEFAULT_PROFILE_NAME not in profiles: profiles[DEFAULT_PROFILE_NAME] = Profile(name=DEFAULT_PROFILE_NAME, api_key=env_key) if not profiles: raise RuntimeError( "Nenhum perfil encontrado. Crie um profiles.json ou defina OPENAI_API_KEY." ) return profiles def classify_error(exc: Exception) -> str: """ Tenta classificar erros comuns do SDK: - 'quota' -> insufficient_quota, billing_hard_limit, etc. - 'rate_limit' -> rate limit exceeded - 'auth' -> chave inválida, não autorizada - 'network' -> timeouts/conexão - 'other' -> desconhecido """ msg = f"{exc}".lower() if any(k in msg for k in ["insufficient_quota", "insufficient quota", "out of credits", "billing hard limit"]): return "quota" if "rate limit" in msg or "too many requests" in msg: return "rate_limit" if any(k in msg for k in ["unauthorized", "invalid api key", "api key not found", "401"]): return "auth" if any(k in msg for k in ["timeout", "timed out", "connection", "temporary failure", "dns"]): return "network" return "other" def stream_chat_completion(client, model: str, messages: List[Dict[str, str]]) -> str: """ Faz streaming do chat completion usando o SDK 1.x (chat.completions). """ # Chamada streaming resp = client.chat.completions.create(model=model, messages=messages, stream=True) out_chunks: List[str] = [] for chunk in resp: try: delta = chunk.choices[0].delta if delta and getattr(delta, "content", None): text = delta.content out_chunks.append(text) print(text, end="", flush=True) except Exception: # ignora pedaços não textuais pass print() # quebra de linha ao final return "".join(out_chunks) def save_transcript(path: str, msgs: List[Dict[str, str]]) -> None: with open(path, "w", encoding="utf-8") as f: f.write("# Transcrição da Conversa\n\n") for m in msgs: role = m.get("role", "user") content = m.get("content", "") if role == "system": f.write(f"**system**: {content}\n\n") elif role == "user": f.write(f"**você**: {content}\n\n") elif role == "assistant": f.write(f"**assistente**: {content}\n\n") # =============================== Loop de Chat ================================= class ChatCLI: def __init__( self, profiles: Dict[str, Profile], active: Optional[str] = None, model: str = DEFAULT_MODEL, auto_failover: bool = False, ) -> None: self.profiles = profiles self.active_name = active if active in profiles else next(iter(profiles)) self.model = model self.auto_failover = auto_failover self.system_prompt: Optional[str] = None self.messages: List[Dict[str, str]] = [] # Inicializa histórico self._rebuild_messages() def _rebuild_messages(self): self.messages = [] if self.system_prompt: self.messages.append({"role": "system", "content": self.system_prompt}) @property def active_profile(self) -> Profile: return self.profiles[self.active_name] def print_header(self): print("=" * 70) print(" Chat CLI (seguro) — múltiplos perfis, seleção de modelo, streaming ") print("=" * 70) print(f"Perfil ativo: {self.active_name}") print(f"Modelo : {self.model}") print(f"Failover : {'ON' if self.auto_failover else 'OFF'}") print("Digite /help para ver comandos. Para enviar, basta escrever e Enter.") print("-" * 70) def cmd_help(self): print( """ Comandos: /help mostra ajuda /profiles lista perfis /use <perfil> troca de perfil /model <modelo> define o modelo (padrão: gpt-5) /system <mensagem> define/atualiza a mensagem de sistema /reset limpa o histórico da conversa /save <arquivo.md> salva a transcrição da conversa /failover on|off liga/desliga failover dinâmico (para 'insufficient_quota') /exit sai """.strip() ) def cmd_profiles(self): print("Perfis disponíveis:") for name, p in self.profiles.items(): mark = "*" if name == self.active_name else " " print(f" {mark} {name} (base_url={p.base_url or '-'} org={p.organization or '-'})") def cmd_use(self, name: str): if name not in self.profiles: print(f"Perfil '{name}' não encontrado.") return self.active_name = name print(f"OK, perfil ativo agora é '{name}'.") def cmd_model(self, name: str): if not name: print(f"Modelo atual: {self.model}") return self.model = name print(f"OK, modelo alterado para '{name}'.") def cmd_system(self, text: str): self.system_prompt = text or None self._rebuild_messages() if text: print("Mensagem de sistema definida.") else: print("Mensagem de sistema removida.") def cmd_reset(self): self._rebuild_messages() print("Histórico limpo.") def cmd_save(self, path: str): try: save_transcript(path, self.messages) print(f"Transcrição salva em: {path}") except Exception as e: print(f"Falha ao salvar: {e}") def cmd_failover(self, flag: str): if flag.lower() in ("on", "true", "1", "yes", "y"): self.auto_failover = True elif flag.lower() in ("off", "false", "0", "no", "n"): self.auto_failover = False else: print("Use: /failover on|off") return print(f"Failover agora: {'ON' if self.auto_failover else 'OFF'}") def _iterate_profiles_for_failover(self) -> List[str]: """Retorna a ordem de tentativa de perfis começando pelo ativo.""" names = list(self.profiles.keys()) if self.active_name in names: idx = names.index(self.active_name) return names[idx:] + names[:idx] return names def send_and_stream(self, user_text: str): # adiciona mensagem do usuário self.messages.append({"role": "user", "content": user_text}) tried: List[str] = [] for prof_name in self._iterate_profiles_for_failover(): is_active_try = (prof_name == self.active_name) profile = self.profiles[prof_name] tried.append(prof_name) try: client = profile.make_client() print(f"\n[↗] Enviando ({prof_name} | modelo={self.model}) …\n") assistant_text = stream_chat_completion(client, self.model, self.messages) self.messages.append({"role": "assistant", "content": assistant_text}) if not is_active_try: # se funcionou com outro, torna-o ativo self.active_name = prof_name print(f"[✓] Conversa continuará com perfil '{prof_name}'.") return except KeyboardInterrupt: print("\n[!] Cancelado.") return except Exception as e: kind = classify_error(e) if kind == "quota": print(f"[!] Perfil '{prof_name}' sem créditos (insufficient_quota).") if not self.auto_failover: # Remove a última user message para não duplicar em nova tentativa self.messages.pop() print("Failover está OFF. Use /use <perfil> ou /failover on para tentar outro.") return else: print("[…] Tentando próximo perfil…") # volta para tentar com o próximo perfil; mantém a última user message continue elif kind in ("rate_limit", "network"): print(f"[!] Erro transitório ({kind}). Aguardando 2s e tentando de novo neste perfil…") time.sleep(2) # tenta novamente no mesmo perfil uma vez try: client = profile.make_client() assistant_text = stream_chat_completion(client, self.model, self.messages) self.messages.append({"role": "assistant", "content": assistant_text}) return except Exception as e2: print(f"[x] Falhou novamente: {e2}") if self.auto_failover: print("[…] Tentando próximo perfil…") continue else: # Remove a última user message para não duplicar em nova tentativa manual self.messages.pop() return elif kind == "auth": print(f"[!] Erro de autenticação no perfil '{prof_name}'.") if self.auto_failover: print("[…] Tentando próximo perfil…") continue else: # Remove a última user message para não duplicar self.messages.pop() return else: print(f"[x] Erro: {e}") # erro inesperado — não insiste # Remove a última user message para não duplicar self.messages.pop() return # Se chegou aqui, tentou todos print(f"[x] Todos os perfis falharam: {', '.join(tried)}") # Remove a última user message para não duplicar em uma futura tentativa manual if self.messages and self.messages[-1]["role"] == "user": self.messages.pop() def repl(self): self.print_header() while True: try: line = input("você> ").strip() except EOFError: print() break except KeyboardInterrupt: print() break if not line: continue if line.startswith("/"): parts = line.split(maxsplit=1) cmd = parts[0].lower() arg = parts[1] if len(parts) > 1 else "" if cmd == "/help": self.cmd_help() elif cmd == "/profiles": self.cmd_profiles() elif cmd == "/use": self.cmd_use(arg.strip()) elif cmd == "/model": self.cmd_model(arg.strip()) elif cmd == "/system": self.cmd_system(arg) elif cmd == "/reset": self.cmd_reset() elif cmd == "/save": if not arg: print("Use: /save caminho/arquivo.md") else: self.cmd_save(arg) elif cmd == "/failover": if not arg: print(f"Failover: {'ON' if self.auto_failover else 'OFF'}") else: self.cmd_failover(arg.strip()) elif cmd == "/exit": break else: print("Comando não reconhecido. Use /help.") continue # Mensagem normal de chat self.send_and_stream(line) # =============================== Main ========================================= def parse_args(argv: List[str]) -> argparse.Namespace: ap = argparse.ArgumentParser(description="CLI de chat com múltiplos perfis e streaming.") ap.add_argument("--profiles", type=str, default=None, help="Caminho para profiles.json") ap.add_argument("--model", type=str, default=DEFAULT_MODEL, help="Modelo inicial (padrão: gpt-5)") ap.add_argument("--auto-failover", action="store_true", help="Habilita failover automático entre perfis") return ap.parse_args(argv) def main(argv: List[str]) -> int: args = parse_args(argv) try: profiles = load_profiles(args.profiles) except Exception as e: print(f"Erro ao carregar perfis: {e}") return 2 active_name = next(iter(profiles)) cli = ChatCLI( profiles=profiles, active=active_name, model=args.model, auto_failover=bool(args.auto_failover), ) cli.repl() return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))