Adicione suporte a muitos cookies p_b para que sej...
Creado el: 20 de agosto de 2025
Respondido usando GPT-5 Thinking por Chat01
Creado el: 20 de agosto de 2025
Respondido usando GPT-5 Thinking por Chat01
Adicione suporte a muitos cookies p_b para que seja usado multiplas contas, daí caso algum cookie falhe, ele use o outro, e no console adicione chat ao inves de ser um unico prompt hardcoded o usuario pode conversar via CLI. E caso, durante a conversa, o cookie acabar os créditos, mude para outro dinamicamente:
import asyncio
import json
from typing import Optional, Dict, Any, List, Tuple
import requests
import websockets
from playwright.async_api import async_playwright
P_B_COOKIE = "" # obrigatório (cookie de sessão p-b)
P_LAT_COOKIE = "" # opcional
CF_CLEARANCE = "" # opcional (Cloudflare)
CF_BM = "" # opcional
P_SB = "" # opcional
BOT_NAME = "GPT-5" # slug/handle visível na URL
PROMPT = "Hello World" # mensagem a enviar
POE_URL_ROOT = "https://poe.com"
POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST"
BASE_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept": "/",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Origin": POE_URL_ROOT,
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/json",
"poegraphql": "0",
}
def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str:
return "; ".join(f"{c.name}={c.value}" for c in jar)
def parse_cookie_header_and_set(session: requests.Session, cookie_header: str):
if not cookie_header:
return
parts = [p.strip() for p in cookie_header.split(";") if "=" in p]
for part in parts:
try:
k, v = part.split("=", 1)
session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/")
except Exception:
continue
def extract_text_pieces(event: Any) -> List[str]:
out: List[str] = []
if isinstance(event, dict):
for k in ("text_new", "response", "delta", "text", "partial_response"):
v = event.get(k)
if isinstance(v, str) and v:
out.append(v)
for v in event.values():
if isinstance(v, (dict, list)):
out.extend(extract_text_pieces(v))
elif isinstance(event, list):
for x in event:
out.extend(extract_text_pieces(x))
return out
def looks_complete(node: Dict[str, Any]) -> bool:
state = (node.get("state") or node.get("message_state") or "").lower()
return state in {"complete", "completed", "stopped", "aborted"}
def is_bot_message(msg: Dict[str, Any]) -> bool:
"""Heurística robusta p/ diferenciar bot vs usuário."""
# 1) author
a = msg.get("author")
if isinstance(a, dict):
handle = str(a.get("handle") or "").lower()
if handle and handle == str(BOT_NAME).lower():
return True
if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"):
return True
t = str(a.get("type") or a.get("role") or "").lower()
if t in ("bot", "assistant", "ai"):
return True
if a.get("isBot") is True:
return True
text# 2) "bot" b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(BOT_NAME).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False
async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], prompt_for_capture: str) -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]:
"""Abre a página logada, intercepta send/subs, sniffa WS/SSE e captura frames enviados pelo cliente. NÃO fecha o Playwright aqui."""
p = await async_playwright().start()
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
text# session cookies add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) # Hooks JS await context.add_init_script("""
(() => {
const OW = window.WebSocket;
if (OW) {
window.WebSocket = new Proxy(OW, {
construct(target, args) {
try { window.POE_LAST_WS_URL = String(args?.[0] ?? ''); } catch(_) {}
const sock = new target(...args);
const _send = sock.send;
try {
sock.send = function(data) {
try {
window.POE_WS_SENT = window.POE_WS_SENT || [];
let payload = data;
if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]";
window.POE_WS_SENT.push(String(payload));
} catch (e) {}
return send.call(this, data);
};
} catch (e) {}
return sock;
}
});
}
const OE = window.EventSource;
if (OE) {
window.EventSource = new Proxy(OE, {
construct(target, args) {
try { window.POE_LAST_ES_URL = String(args?.[0] ?? ''); } catch() {}
return new target(...args);
}
});
}
})();
""")
textpage = await context.new_page() captured: Dict[str, Optional[str]] = { # sendMessageMutation "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, # subscriptionsMutation "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, # metas "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, # stream "ws_url": None, "es_url": None, # frames "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # typing the prompt try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(prompt_for_capture) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(prompt_for_capture) await page.keyboard.press("Enter") except Exception: pass # waiting for the capture for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) # Hooks JS + frames sent by the client try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles
async def cleanup_playwright(handles: Dict[str, Any]):
try:
if handles.get("context"):
await handles["context"].close()
except Exception:
pass
try:
if handles.get("browser"):
await handles["browser"].close()
except Exception:
pass
try:
if handles.get("p"):
await handles["p"].stop()
except Exception:
pass
async def listen_via_websocket_url(ws_url: str,
session: requests.Session,
target_chat_id: Optional[str],
meta_headers: Dict[str, str],
initial_client_frames: List[str],
timeout_seconds: int = 120) -> str:
"""Conecta no ws_url, envia frames iniciais (se houver), desenvelopa 'messages[]' e ESCUTA APENAS O BOT."""
def _iter_inner_frames(packet: Any):
# packet can be str/dict; send with "messages": [ "<json>", ... ]
try:
if isinstance(packet, (bytes, bytearray)):
packet = packet.decode("utf-8", "ignore")
if isinstance(packet, str):
packet = json.loads(packet)
if not isinstance(packet, dict):
return
except Exception:
return
textmsgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": BASE_HEADERS["User-Agent"], "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) print(f"[WS] >> {f[:80] + ('…' if len(f) > 80 else '')}") except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break got_update = False # unwrap and process try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue # >>>>>>> filter: only bot messages if not is_bot_message(msg): # first time, log light (key to identify author) # (not too verbose) continue # <<<<<<< pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full
def _build_stream_headers(session: requests.Session) -> Dict[str, str]:
return {
"Origin": "https://poe.com",
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"User-Agent": BASE_HEADERS["User-Agent"],
"Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Cookie": cookie_header_from_jar(session.cookies),
}
def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str]) -> Tuple[bool, str, Dict[str, Any]]:
def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]:
nodes = []
for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"):
node = d.get(key)
if isinstance(node, dict):
nodes.append(node)
if not nodes and isinstance(d, dict):
for v in d.values():
if isinstance(v, dict) and ("text_new" in v or "message" in v):
nodes.append(v)
return nodes
texttry: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {}
async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], timeout_seconds: int = 120) -> str:
"""Conecta via SSE no es_url e ESCUTA (apenas BOT)."""
headers = _build_stream_headers(session)
headers["Accept"] = "text/event-stream"
full = ""
with session.get(es_url, headers=headers, stream=True) as r:
r.raise_for_status()
import time
end_at = time.time() + timeout_seconds
for raw in r.iter_lines(decode_unicode=True):
if time.time() > end_at:
print("\n[SSE] Timeout sem eventos. Verifique tchannel/channel.")
break
if not raw or raw.startswith(":"):
continue
if not raw.startswith("data:"):
continue
line = raw[5:].strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id)
if ok and chunk:
print(chunk, end="", flush=True)
full += chunk
if msg and looks_complete(msg):
print("\n--- Resposta Completa (SSE) ---")
break
return full
def summarize_payload(payload_base: Dict[str, Any]) -> str:
vars_obj = (payload_base or {}).get("variables") or {}
exts_obj = (payload_base or {}).get("extensions") or {}
qn = (payload_base or {}).get("queryName")
sdid = vars_obj.get("sdid")
bot = vars_obj.get("bot")
h = exts_obj.get("hash")
return f"qn:{qn} bot:{str(bot)[:12]}… sdid:{str(sdid)[:12]}… ext_hash:{str(h)[:8]}…"
async def main():
session = requests.Session()
session.headers.update(BASE_HEADERS)
text# Base cookies session.cookies.update({"p-b": P_B_COOKIE}) if P_LAT_COOKIE: session.cookies.set("p-lat", P_LAT_COOKIE, domain="poe.com", path="/") if CF_CLEARANCE: session.cookies.set("cf_clearance", CF_CLEARANCE, domain="poe.com", path="/") if CF_BM: session.cookies.set("__cf_bm", CF_BM, domain="poe.com", path="/") if P_SB: session.cookies.set("p-sb", P_SB, domain="poe.com", path="/") info = None handles = None try: info, handles = await get_bootstrap( BOT_NAME, {"p-b": P_B_COOKIE, "p-lat": P_LAT_COOKIE, "cf_clearance": CF_CLEARANCE, "__cf_bm": CF_BM, "p-sb": P_SB}, PROMPT, ) # Always use the tchannel from the browser tch = info.get("tchannel_hdr") if tch: session.headers["poe-tchannel"] = tch # (Optional) subscriptionsMutation replay subs_raw = info.get("subs_payload_raw") subs_hdrs = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} subs_cookie_hdr = info.get("subs_cookie_header") or "" if subs_cookie_hdr: parse_cookie_header_and_set(session, subs_cookie_hdr) if subs_raw: print("Registrando subscriptions via subscriptionsMutation (replay 1:1)…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in subs_hdrs.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" resp_subs = session.post(POE_API_URL, data=subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() else: print("Aviso: não capturei subscriptionsMutation; seguirei com stream apenas (pode já estar registrado).") # =========== SEND MESSAGE (replay 1:1) =========== send_raw = info.get("send_payload_raw") send_hdrs = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} send_cookie_hdr = info.get("send_cookie_header") or "" if send_cookie_hdr: parse_cookie_header_and_set(session, send_cookie_hdr) if not send_raw: raise RuntimeError("Não capturei a sendMessageMutation (body RAW).") print("Enviando a pergunta via GraphQL com payload/headers CAPTURADOS…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in send_hdrs.items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" if "poe-tchannel" in session.headers: gql_headers_send["poe-tchannel"] = session.headers["poe-tchannel"] resp = session.post(POE_API_URL, data=send_raw, headers=gql_headers_send) resp.raise_for_status() data = resp.json() root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: raise RuntimeError("Resposta sem objeto 'chat' esperado (messageCreate/messageEdgeCreate).") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") print(f"Chat criado! ID: {chat_id}") # =========== STREAM LISTEN =========== ws_url = info.get("ws_url") or "" es_url = info.get("es_url") or "" if (not es_url) and ws_url.startswith("wss://"): es_url = "https://" + ws_url[len("wss://"):] if not (ws_url or es_url): raise RuntimeError("Não capturei WS *nem* SSE (updates?channel=…). Revise os hooks.") meta_headers = {} if tch: meta_headers["poe-tchannel"] = tch if info.get("formkey"): meta_headers["poe-formkey"] = info.get("formkey") if info.get("revision"):meta_headers["poe-revision"] = info.get("revision") if info.get("tag_id"): meta_headers["poe-tag-id"] = info.get("tag_id") if ws_url: print(f"Conectando via WebSocket: {ws_url}") final_answer = await listen_via_websocket_url( ws_url, session, str(chat_id), meta_headers=meta_headers, initial_client_frames=info.get("ws_client_frames") or [], timeout_seconds=120, ) if not final_answer and es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) else: print(f"Conectando via SSE: {es_url}") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) print("\nRESPOSTA FINAL:") print(final_answer if final_answer else "(vazio)") except requests.HTTPError as e: body = getattr(e.response, "text", "")[:2000] print(f"HTTPError: {e}\nBody: {body}") except Exception as e: print(f"Erro: {e}") finally: if handles: await cleanup_playwright(handles)
if name == "main":
asyncio.run(main())
Adicione suporte a muitos cookies p_b para que seja usado multiplas contas, daí caso algum cookie falhe, ele use o outro, e no console adicione chat ao inves de ser um unico prompt hardcoded o usuario pode conversar via CLI. E caso, durante a conversa, o cookie acabar os créditos, mude para outro dinamicamente:
import asyncio
import json
from typing import Optional, Dict, Any, List, Tuple
import requests
import websockets
from playwright.async_api import async_playwright
P_B_COOKIE = "" # obrigatório (cookie de sessão p-b)
P_LAT_COOKIE = "" # opcional
CF_CLEARANCE = "" # opcional (Cloudflare)
CF_BM = "" # opcional
P_SB = "" # opcional
BOT_NAME = "GPT-5" # slug/handle visível na URL
PROMPT = "Hello World" # mensagem a enviar
POE_URL_ROOT = "https://poe.com"
POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST"
BASE_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept": "/",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Origin": POE_URL_ROOT,
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/json",
"poegraphql": "0",
}
def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str:
return "; ".join(f"{c.name}={c.value}" for c in jar)
def parse_cookie_header_and_set(session: requests.Session, cookie_header: str):
if not cookie_header:
return
parts = [p.strip() for p in cookie_header.split(";") if "=" in p]
for part in parts:
try:
k, v = part.split("=", 1)
session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/")
except Exception:
continue
def extract_text_pieces(event: Any) -> List[str]:
out: List[str] = []
if isinstance(event, dict):
for k in ("text_new", "response", "delta", "text", "partial_response"):
v = event.get(k)
if isinstance(v, str) and v:
out.append(v)
for v in event.values():
if isinstance(v, (dict, list)):
out.extend(extract_text_pieces(v))
elif isinstance(event, list):
for x in event:
out.extend(extract_text_pieces(x))
return out
def looks_complete(node: Dict[str, Any]) -> bool:
state = (node.get("state") or node.get("message_state") or "").lower()
return state in {"complete", "completed", "stopped", "aborted"}
def is_bot_message(msg: Dict[str, Any]) -> bool:
"""Heurística robusta p/ diferenciar bot vs usuário."""
# 1) author
a = msg.get("author")
if isinstance(a, dict):
handle = str(a.get("handle") or "").lower()
if handle and handle == str(BOT_NAME).lower():
return True
if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"):
return True
t = str(a.get("type") or a.get("role") or "").lower()
if t in ("bot", "assistant", "ai"):
return True
if a.get("isBot") is True:
return True
text# 2) "bot" b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(BOT_NAME).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False
async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], prompt_for_capture: str) -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]:
"""Abre a página logada, intercepta send/subs, sniffa WS/SSE e captura frames enviados pelo cliente. NÃO fecha o Playwright aqui."""
p = await async_playwright().start()
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
text# session cookies add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) # Hooks JS await context.add_init_script("""
(() => {
const OW = window.WebSocket;
if (OW) {
window.WebSocket = new Proxy(OW, {
construct(target, args) {
try { window.POE_LAST_WS_URL = String(args?.[0] ?? ''); } catch(_) {}
const sock = new target(...args);
const _send = sock.send;
try {
sock.send = function(data) {
try {
window.POE_WS_SENT = window.POE_WS_SENT || [];
let payload = data;
if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]";
window.POE_WS_SENT.push(String(payload));
} catch (e) {}
return send.call(this, data);
};
} catch (e) {}
return sock;
}
});
}
const OE = window.EventSource;
if (OE) {
window.EventSource = new Proxy(OE, {
construct(target, args) {
try { window.POE_LAST_ES_URL = String(args?.[0] ?? ''); } catch() {}
return new target(...args);
}
});
}
})();
""")
textpage = await context.new_page() captured: Dict[str, Optional[str]] = { # sendMessageMutation "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, # subscriptionsMutation "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, # metas "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, # stream "ws_url": None, "es_url": None, # frames "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # typing the prompt try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(prompt_for_capture) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(prompt_for_capture) await page.keyboard.press("Enter") except Exception: pass # waiting for the capture for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) # Hooks JS + frames sent by the client try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles
async def cleanup_playwright(handles: Dict[str, Any]):
try:
if handles.get("context"):
await handles["context"].close()
except Exception:
pass
try:
if handles.get("browser"):
await handles["browser"].close()
except Exception:
pass
try:
if handles.get("p"):
await handles["p"].stop()
except Exception:
pass
async def listen_via_websocket_url(ws_url: str,
session: requests.Session,
target_chat_id: Optional[str],
meta_headers: Dict[str, str],
initial_client_frames: List[str],
timeout_seconds: int = 120) -> str:
"""Conecta no ws_url, envia frames iniciais (se houver), desenvelopa 'messages[]' e ESCUTA APENAS O BOT."""
def _iter_inner_frames(packet: Any):
# packet can be str/dict; send with "messages": [ "<json>", ... ]
try:
if isinstance(packet, (bytes, bytearray)):
packet = packet.decode("utf-8", "ignore")
if isinstance(packet, str):
packet = json.loads(packet)
if not isinstance(packet, dict):
return
except Exception:
return
textmsgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": BASE_HEADERS["User-Agent"], "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) print(f"[WS] >> {f[:80] + ('…' if len(f) > 80 else '')}") except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break got_update = False # unwrap and process try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue # >>>>>>> filter: only bot messages if not is_bot_message(msg): # first time, log light (key to identify author) # (not too verbose) continue # <<<<<<< pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full
def _build_stream_headers(session: requests.Session) -> Dict[str, str]:
return {
"Origin": "https://poe.com",
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"User-Agent": BASE_HEADERS["User-Agent"],
"Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Cookie": cookie_header_from_jar(session.cookies),
}
def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str]) -> Tuple[bool, str, Dict[str, Any]]:
def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]:
nodes = []
for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"):
node = d.get(key)
if isinstance(node, dict):
nodes.append(node)
if not nodes and isinstance(d, dict):
for v in d.values():
if isinstance(v, dict) and ("text_new" in v or "message" in v):
nodes.append(v)
return nodes
texttry: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {}
async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], timeout_seconds: int = 120) -> str:
"""Conecta via SSE no es_url e ESCUTA (apenas BOT)."""
headers = _build_stream_headers(session)
headers["Accept"] = "text/event-stream"
full = ""
with session.get(es_url, headers=headers, stream=True) as r:
r.raise_for_status()
import time
end_at = time.time() + timeout_seconds
for raw in r.iter_lines(decode_unicode=True):
if time.time() > end_at:
print("\n[SSE] Timeout sem eventos. Verifique tchannel/channel.")
break
if not raw or raw.startswith(":"):
continue
if not raw.startswith("data:"):
continue
line = raw[5:].strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id)
if ok and chunk:
print(chunk, end="", flush=True)
full += chunk
if msg and looks_complete(msg):
print("\n--- Resposta Completa (SSE) ---")
break
return full
def summarize_payload(payload_base: Dict[str, Any]) -> str:
vars_obj = (payload_base or {}).get("variables") or {}
exts_obj = (payload_base or {}).get("extensions") or {}
qn = (payload_base or {}).get("queryName")
sdid = vars_obj.get("sdid")
bot = vars_obj.get("bot")
h = exts_obj.get("hash")
return f"qn:{qn} bot:{str(bot)[:12]}… sdid:{str(sdid)[:12]}… ext_hash:{str(h)[:8]}…"
async def main():
session = requests.Session()
session.headers.update(BASE_HEADERS)
text# Base cookies session.cookies.update({"p-b": P_B_COOKIE}) if P_LAT_COOKIE: session.cookies.set("p-lat", P_LAT_COOKIE, domain="poe.com", path="/") if CF_CLEARANCE: session.cookies.set("cf_clearance", CF_CLEARANCE, domain="poe.com", path="/") if CF_BM: session.cookies.set("__cf_bm", CF_BM, domain="poe.com", path="/") if P_SB: session.cookies.set("p-sb", P_SB, domain="poe.com", path="/") info = None handles = None try: info, handles = await get_bootstrap( BOT_NAME, {"p-b": P_B_COOKIE, "p-lat": P_LAT_COOKIE, "cf_clearance": CF_CLEARANCE, "__cf_bm": CF_BM, "p-sb": P_SB}, PROMPT, ) # Always use the tchannel from the browser tch = info.get("tchannel_hdr") if tch: session.headers["poe-tchannel"] = tch # (Optional) subscriptionsMutation replay subs_raw = info.get("subs_payload_raw") subs_hdrs = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} subs_cookie_hdr = info.get("subs_cookie_header") or "" if subs_cookie_hdr: parse_cookie_header_and_set(session, subs_cookie_hdr) if subs_raw: print("Registrando subscriptions via subscriptionsMutation (replay 1:1)…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in subs_hdrs.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" resp_subs = session.post(POE_API_URL, data=subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() else: print("Aviso: não capturei subscriptionsMutation; seguirei com stream apenas (pode já estar registrado).") # =========== SEND MESSAGE (replay 1:1) =========== send_raw = info.get("send_payload_raw") send_hdrs = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} send_cookie_hdr = info.get("send_cookie_header") or "" if send_cookie_hdr: parse_cookie_header_and_set(session, send_cookie_hdr) if not send_raw: raise RuntimeError("Não capturei a sendMessageMutation (body RAW).") print("Enviando a pergunta via GraphQL com payload/headers CAPTURADOS…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in send_hdrs.items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" if "poe-tchannel" in session.headers: gql_headers_send["poe-tchannel"] = session.headers["poe-tchannel"] resp = session.post(POE_API_URL, data=send_raw, headers=gql_headers_send) resp.raise_for_status() data = resp.json() root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: raise RuntimeError("Resposta sem objeto 'chat' esperado (messageCreate/messageEdgeCreate).") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") print(f"Chat criado! ID: {chat_id}") # =========== STREAM LISTEN =========== ws_url = info.get("ws_url") or "" es_url = info.get("es_url") or "" if (not es_url) and ws_url.startswith("wss://"): es_url = "https://" + ws_url[len("wss://"):] if not (ws_url or es_url): raise RuntimeError("Não capturei WS *nem* SSE (updates?channel=…). Revise os hooks.") meta_headers = {} if tch: meta_headers["poe-tchannel"] = tch if info.get("formkey"): meta_headers["poe-formkey"] = info.get("formkey") if info.get("revision"):meta_headers["poe-revision"] = info.get("revision") if info.get("tag_id"): meta_headers["poe-tag-id"] = info.get("tag_id") if ws_url: print(f"Conectando via WebSocket: {ws_url}") final_answer = await listen_via_websocket_url( ws_url, session, str(chat_id), meta_headers=meta_headers, initial_client_frames=info.get("ws_client_frames") or [], timeout_seconds=120, ) if not final_answer and es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) else: print(f"Conectando via SSE: {es_url}") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) print("\nRESPOSTA FINAL:") print(final_answer if final_answer else "(vazio)") except requests.HTTPError as e: body = getattr(e.response, "text", "")[:2000] print(f"HTTPError: {e}\nBody: {body}") except Exception as e: print(f"Erro: {e}") finally: if handles: await cleanup_playwright(handles)
if name == "main":
asyncio.run(main())
legalzinho aviso rápido: automatizar acesso com cookies e “scraping” pode violar os Termos do Poe/Cloudflare. Use só em contas suas e por sua conta e risco.
abaixo vai um refactor que adiciona:
ACCOUNTS
com quantas contas quiser (mínimo o p-b
).python poe_cli.py
:bot gpt-4o
troca o bot “ao vivo”; :quit
sai.pythonimport asyncio import json import sys from dataclasses import dataclass, field from typing import Optional, Dict, Any, List, Tuple import requests import websockets from playwright.async_api import async_playwright # ============================================================================== # CONFIG # ============================================================================== POE_URL_ROOT = "https://poe.com" POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST" DEFAULT_BOT = "GPT-5" # pode trocar via comando :bot <handle> USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0" ) # --- MULTI-CONTA: adicione aqui suas contas. p-b é obrigatório. ACCOUNTS: List[Dict[str, str]] = [ {"p-b": "PB_COOKIE_1", "p-lat": "", "cf_clearance": "", "__cf_bm": "", "p-sb": ""}, {"p-b": "PB_COOKIE_2", "p-lat": "", "cf_clearance": "", "__cf_bm": "", "p-sb": ""}, # ... quantas quiser ] BASE_HEADERS = { "User-Agent": USER_AGENT, "Accept": "*/*", "Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8", "Origin": POE_URL_ROOT, "Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Content-Type": "application/json", "poegraphql": "0", } # ============================================================================== # UTILS # ============================================================================== def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str: return "; ".join(f"{c.name}={c.value}" for c in jar) def parse_cookie_header_and_set(session: requests.Session, cookie_header: str): if not cookie_header: return parts = [p.strip() for p in cookie_header.split(";") if "=" in p] for part in parts: try: k, v = part.split("=", 1) session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/") except Exception: continue def extract_text_pieces(event: Any) -> List[str]: out: List[str] = [] if isinstance(event, dict): for k in ("text_new", "response", "delta", "text", "partial_response"): v = event.get(k) if isinstance(v, str) and v: out.append(v) for v in event.values(): if isinstance(v, (dict, list)): out.extend(extract_text_pieces(v)) elif isinstance(event, list): for x in event: out.extend(extract_text_pieces(x)) return out def looks_complete(node: Dict[str, Any]) -> bool: state = (node.get("state") or node.get("message_state") or "").lower() return state in {"complete", "completed", "stopped", "aborted"} def is_bot_message(msg: Dict[str, Any], bot_handle: str) -> bool: a = msg.get("author") if isinstance(a, dict): handle = str(a.get("handle") or "").lower() if handle and handle == str(bot_handle).lower(): return True if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"): return True t = str(a.get("type") or a.get("role") or "").lower() if t in ("bot", "assistant", "ai"): return True if a.get("isBot") is True: return True b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(bot_handle).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False # ============================================================================== # COOKIES POOL # ============================================================================== @dataclass class AccountCtx: cookies: Dict[str, str] session: Optional[requests.Session] = None formkey: Optional[str] = None revision: Optional[str] = None tag_id: Optional[str] = None tchannel_hdr: Optional[str] = None ws_url: Optional[str] = None es_url: Optional[str] = None ws_client_frames: List[str] = field(default_factory=list) send_payload_template: Optional[Dict[str, Any]] = None send_headers_template: Dict[str, str] = field(default_factory=dict) subs_raw: Optional[str] = None subs_headers_template: Dict[str, str] = field(default_factory=dict) healthy: bool = True class CookiePool: def __init__(self, accounts: List[Dict[str, str]]): self.ctxs = [AccountCtx(cookies=a) for a in accounts if a.get("p-b")] if not self.ctxs: raise RuntimeError("Nenhuma conta válida (p-b obrigatório).") self.idx = 0 def current(self) -> AccountCtx: return self.ctxs[self.idx] def mark_bad_and_rotate(self, reason: str = "") -> AccountCtx: bad = self.current() bad.healthy = False print(f"\n[POOL] Conta atual marcada como indisponível ({reason}). Alternando…") for _ in range(len(self.ctxs)): self.idx = (self.idx + 1) % len(self.ctxs) if self.ctxs[self.idx].healthy: print(f"[POOL] Nova conta selecionada (idx={self.idx}).") return self.ctxs[self.idx] # se todas ruins, reseta e tenta todas de novo for c in self.ctxs: c.healthy = True print("[POOL] Todas marcadas ruins. Reset de saúde e usando próxima.") self.idx = (self.idx + 1) % len(self.ctxs) return self.ctxs[self.idx] # ============================================================================== # BOOTSTRAP VIA PLAYWRIGHT # ============================================================================== async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], primer_prompt: str = "hi") -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]: p = await async_playwright().start() browser = await p.chromium.launch(headless=True) context = await browser.new_context() add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) await context.add_init_script(""" (() => { const OW = window.WebSocket; if (OW) { window.WebSocket = new Proxy(OW, { construct(target, args) { try { window.__POE_LAST_WS_URL__ = String(args?.[0] ?? ''); } catch(_) {} const sock = new target(...args); const _send = sock.send; try { sock.send = function(data) { try { window.__POE_WS_SENT__ = window.__POE_WS_SENT__ || []; let payload = data; if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]"; window.__POE_WS_SENT__.push(String(payload)); } catch (e) {} return _send.call(this, data); }; } catch (e) {} return sock; } }); } const OE = window.EventSource; if (OE) { window.EventSource = new Proxy(OE, { construct(target, args) { try { window.__POE_LAST_ES_URL__ = String(args?.[0] ?? ''); } catch(_) {} return new target(...args); } }); } })(); """) page = await context.new_page() captured: Dict[str, Optional[str]] = { "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, "ws_url": None, "es_url": None, "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # Digita um primer pequeno só para capturar os payloads/headers try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(primer_prompt) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(primer_prompt) await page.keyboard.press("Enter") except Exception: pass for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles async def cleanup_playwright(handles: Dict[str, Any]): try: if handles.get("context"): await handles["context"].close() except Exception: pass try: if handles.get("browser"): await handles["browser"].close() except Exception: pass try: if handles.get("p"): await handles["p"].stop() except Exception: pass # ============================================================================== # STREAM LISTENERS # ============================================================================== async def listen_via_websocket_url(ws_url: str, session: requests.Session, target_chat_id: Optional[str], meta_headers: Dict[str, str], initial_client_frames: List[str], bot_handle: str, timeout_seconds: int = 120) -> Tuple[str, bool]: def _iter_inner_frames(packet: Any): try: if isinstance(packet, (bytes, bytearray)): packet = packet.decode("utf-8", "ignore") if isinstance(packet, str): packet = json.loads(packet) if not isinstance(packet, dict): return except Exception: return msgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": USER_AGENT, "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" depleted = False print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos.") break got_update = False try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": # Heurística: mensagens de erro gerais text = json.dumps(inner, ensure_ascii=False) if "credit" in text.lower() or "trial" in text.lower(): depleted = True continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg, bot_handle): continue pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full, depleted if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full, depleted def _build_stream_headers(session: requests.Session, bot_handle: str) -> Dict[str, str]: return { "Origin": "https://poe.com", "Referer": f"{POE_URL_ROOT}/{bot_handle}", "User-Agent": USER_AGENT, "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", "Cookie": cookie_header_from_jar(session.cookies), } def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str], bot_handle: str) -> Tuple[bool, str, Dict[str, Any]]: def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes try: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg, bot_handle): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {} async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], bot_handle: str, timeout_seconds: int = 120) -> Tuple[str, bool]: headers = _build_stream_headers(session, bot_handle) headers["Accept"] = "text/event-stream" full = "" depleted = False with session.get(es_url, headers=headers, stream=True) as r: try: r.raise_for_status() except requests.HTTPError as e: if e.response is not None and e.response.status_code in (401,403,429,402): depleted = True raise import time end_at = time.time() + timeout_seconds for raw in r.iter_lines(decode_unicode=True): if time.time() > end_at: print("\n[SSE] Timeout sem eventos.") break if not raw or raw.startswith(":"): continue if not raw.startswith("data:"): continue line = raw[5:].strip() if not line: continue try: payload = json.loads(line) except Exception: continue ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id, bot_handle) # Heurística: mensagens de “sem crédito” if not ok: low = line.lower() if any(k in low for k in ("credit", "quota", "trial", "limit", "payment")): depleted = True if ok and chunk: print(chunk, end="", flush=True) full += chunk if msg and looks_complete(msg): print("\n--- Resposta Completa (SSE) ---") break return full, depleted # ============================================================================== # PAYLOAD MUTATION # ============================================================================== def patch_send_payload_text(payload: Dict[str, Any], new_text: str, bot_handle: str) -> str: """Tenta substituir o texto da mensagem no payload capturado e bot alvo. Retorna o corpo JSON pronto para envio (str).""" cloned = json.loads(json.dumps(payload)) # Campos que geralmente carregam o texto: paths = [ ("variables", "query"), ("variables", "message"), ("variables", "text"), ("variables", "input"), ] applied = False for p in paths: cur = cloned ok = True for k in p[:-1]: cur = cur.get(k, {}) if not isinstance(cur, dict): ok = False break if ok and isinstance(cur, dict) and p[-1] in cur: cur[p[-1]] = new_text applied = True # força bot/handle se existir no template for key in ("bot", "handle", "botHandle"): if "variables" in cloned and isinstance(cloned["variables"], dict) and key in cloned["variables"]: cloned["variables"][key] = bot_handle applied = True if not applied: # fallback: se não sabe onde por, cria "query" cloned.setdefault("variables", {}) cloned["variables"]["query"] = new_text cloned["variables"]["bot"] = bot_handle return json.dumps(cloned, ensure_ascii=False) # ============================================================================== # REQUEST SENDER (com failover) # ============================================================================== def apply_base_cookies(session: requests.Session, cks: Dict[str, str]): session.cookies.update({"p-b": cks.get("p-b", "")}) if cks.get("p-lat"): session.cookies.set("p-lat", cks["p-lat"], domain="poe.com", path="/") if cks.get("cf_clearance"): session.cookies.set("cf_clearance", cks["cf_clearance"], domain="poe.com", path="/") if cks.get("__cf_bm"): session.cookies.set("__cf_bm", cks["__cf_bm"], domain="poe.com", path="/") if cks.get("p-sb"): session.cookies.set("p-sb", cks["p-sb"], domain="poe.com", path="/") def http_maybe_depleted(resp: requests.Response, body_text: str) -> bool: if resp.status_code in (401,403,429,402): return True low = body_text.lower() if body_text else "" return any(k in low for k in ("credit", "quota", "trial", "limit", "payment", "subscribe")) async def bootstrap_if_needed(ctx: AccountCtx, bot_handle: str): if ctx.session is None: s = requests.Session() s.headers.update(BASE_HEADERS | {"Referer": f"{POE_URL_ROOT}/{bot_handle}"}) apply_base_cookies(s, ctx.cookies) ctx.session = s if ctx.send_payload_template and ctx.ws_url: return info, handles = await get_bootstrap(bot_handle, ctx.cookies, primer_prompt=".") try: # subs re-jogo (opcional) ctx.subs_raw = info.get("subs_payload_raw") ctx.subs_headers_template = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} if info.get("subs_cookie_header"): parse_cookie_header_and_set(ctx.session, info["subs_cookie_header"]) if ctx.subs_raw: blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in ctx.subs_headers_template.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{bot_handle}" resp_subs = ctx.session.post(POE_API_URL, data=ctx.subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() # payload base ctx.send_payload_template = info.get("send_payload_dict") ctx.send_headers_template = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} if info.get("send_cookie_header"): parse_cookie_header_and_set(ctx.session, info["send_cookie_header"]) ctx.formkey = info.get("formkey") ctx.revision= info.get("revision") ctx.tag_id = info.get("tag_id") ctx.tchannel_hdr = info.get("tchannel_hdr") ctx.ws_url = info.get("ws_url") ctx.es_url = info.get("es_url") or ("https://" + info.get("ws_url","")[len("wss://"):] if info.get("ws_url","").startswith("wss://") else None) ctx.ws_client_frames = info.get("ws_client_frames") or [] finally: await cleanup_playwright(handles) async def send_and_stream(cookie_pool: CookiePool, text: str, bot_handle: str) -> None: """Envia a mensagem com a conta atual; se esgotar créditos/erro, troca de conta e reenvia.""" tried = 0 last_error = None while tried < len(cookie_pool.ctxs) * 2: # dá duas voltas se preciso tried += 1 ctx = cookie_pool.current() try: await bootstrap_if_needed(ctx, bot_handle) # headers para send blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in (ctx.send_headers_template or {}).items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{bot_handle}" if ctx.tchannel_hdr: ctx.session.headers["poe-tchannel"] = ctx.tchannel_hdr gql_headers_send["poe-tchannel"] = ctx.tchannel_hdr body = patch_send_payload_text(ctx.send_payload_template or {}, text, bot_handle) resp = ctx.session.post(POE_API_URL, data=body, headers=gql_headers_send) depleted = False try: resp.raise_for_status() data = resp.json() except requests.HTTPError as e: bt = getattr(e.response, "text", "")[:2000] depleted = http_maybe_depleted(e.response, bt) raise except Exception: # se não é JSON, pode ser bloqueio depleted = http_maybe_depleted(resp, resp.text[:2000] if hasattr(resp, "text") else "") raise root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: # checar erros graphql text_blob = json.dumps(data, ensure_ascii=False).lower() if any(k in text_blob for k in ("credit", "quota", "trial", "limit", "payment")): depleted = True raise RuntimeError("Resposta sem objeto 'chat' esperado.") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") # stream meta_headers = {} if ctx.tchannel_hdr: meta_headers["poe-tchannel"] = ctx.tchannel_hdr if ctx.formkey: meta_headers["poe-formkey"] = ctx.formkey if ctx.revision: meta_headers["poe-revision"] = ctx.revision if ctx.tag_id: meta_headers["poe-tag-id"] = ctx.tag_id print(f"[stream] usando conta idx={cookie_pool.idx} | bot={bot_handle}") if ctx.ws_url: ans, dep = await listen_via_websocket_url( ctx.ws_url, ctx.session, str(chat_id), meta_headers, ctx.ws_client_frames, bot_handle, timeout_seconds=180 ) depleted = depleted or dep if not ans and ctx.es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") ans2, dep2 = await listen_via_sse_url(ctx.es_url, ctx.session, str(chat_id), bot_handle, timeout_seconds=180) depleted = depleted or dep2 else: if not ctx.es_url: raise RuntimeError("Sem WS nem SSE capturados.") ans2, depleted = await listen_via_sse_url(ctx.es_url, ctx.session, str(chat_id), bot_handle, timeout_seconds=180) if depleted: cookie_pool.mark_bad_and_rotate("créditos esgotados/bloqueio") return # sucesso (ou ao menos tentou stream) except Exception as e: last_error = e msg = str(e) # heurística: se parecer quota/credit/rate limit, troca low = msg.lower() if any(k in low for k in ("credit", "quota", "trial", "limit", "payment")): cookie_pool.mark_bad_and_rotate("indício de falta de créditos") else: # HTTP status checados? if "429" in low or "403" in low or "401" in low or "402" in low: cookie_pool.mark_bad_and_rotate("http auth/rate") else: print(f"[warn] erro na conta idx={cookie_pool.idx}: {e}") # tenta mesma conta mais uma vez; se falhar, troca cookie_pool.mark_bad_and_rotate("erro genérico") continue raise RuntimeError(f"Falha após múltiplas tentativas/failovers. Último erro: {last_error}") # ============================================================================== # CLI REPL # ============================================================================== HELP = """\ Comandos: :bot <handle> -> troca o bot (ex: :bot gpt-4o) :quit -> sair :help -> ajuda """ async def ainput(prompt: str = "") -> str: return await asyncio.to_thread(input, prompt) async def repl(): pool = CookiePool(ACCOUNTS) bot_handle = DEFAULT_BOT print("Poe CLI com múltiplas contas p-b.") print(HELP) while True: try: user = (await ainput("\nVocê> ")).strip() except (EOFError, KeyboardInterrupt): print("\nbye!") return if not user: continue if user.startswith(":"): parts = user[1:].split() if not parts: print(HELP) continue cmd = parts[0].lower() if cmd == "quit": print("bye!") return if cmd == "help": print(HELP) continue if cmd == "bot": if len(parts) < 2: print("Uso: :bot <handle>") continue bot_handle = parts[1] print(f"[ok] bot -> {bot_handle}") # reset parcial: deixa cada ctx re-bootstrapar no novo bot quando necessário for ctx in pool.ctxs: ctx.ws_url = None ctx.es_url = None ctx.send_payload_template = None continue print("Comando desconhecido.") continue try: await send_and_stream(pool, user, bot_handle) except Exception as e: print(f"\n[erro] {e}") # ============================================================================== # MAIN # ============================================================================== if __name__ == "__main__": try: asyncio.run(repl()) except KeyboardInterrupt: print("\nbye!")
CookiePool
marca a conta atual como “ruim” quando recebe sinais fortes de quota/403/429/402, ou quando a própria resposta/stream contém termos tipo “credit/quota/trial/limit”. Em seguida rota para a próxima conta. Se todas ficarem ruins, ele “reseta” o estado de saúde e roda de novo.AccountCtx
faz um bootstrap preguiçoso com o get_bootstrap()
apenas quando necessário (ou quando você troca o bot com :bot
); isso captura sendPayload
, subs
, tchannel
, formkey
, ws/es URL
e frames de handshake.sendMessageMutation
com o texto trocado por patch_send_payload_text()
. Ele tenta as chaves comuns (variables.query
, variables.message
etc.) e força o handle do bot no payload.:bot <handle>
limpa caches de WS/SSE/payload, de modo que cada conta reboote no novo handle quando precisar.looks_complete()
; se o WS não render, cai para SSE.Se quiser, eu também posso adicionar:
proxy
/playwright.connect_over_cdp
;:sys <texto>
para mandar system prompts, se o payload do Poe estiver expondo o campo.