Adicione suporte a muitos cookies p_b para que sej...

Créé le : 20 août 2025

Répondu en utilisant GPT-5 Thinking par Chat01

Question

Adicione suporte a muitos cookies p_b para que seja usado multiplas contas, daí caso algum cookie falhe, ele use o outro, e no console adicione chat ao inves de ser um unico prompt hardcoded o usuario pode conversar via CLI. E caso, durante a conversa, o cookie acabar os créditos, mude para outro dinamicamente:
import asyncio
import json
from typing import Optional, Dict, Any, List, Tuple

import requests
import websockets
from playwright.async_api import async_playwright

==============================================================================

0) CONFIGURATIONS AND COOKIES

==============================================================================

P_B_COOKIE = "" # obrigatório (cookie de sessão p-b)
P_LAT_COOKIE = "" # opcional
CF_CLEARANCE = "" # opcional (Cloudflare)
CF_BM = "" # opcional
P_SB = "" # opcional

BOT_NAME = "GPT-5" # slug/handle visível na URL
PROMPT = "Hello World" # mensagem a enviar

POE_URL_ROOT = "https://poe.com"
POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST"

BASE_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept": "/",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Origin": POE_URL_ROOT,
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/json",
"poegraphql": "0",
}

============================== UTILS =========================================

def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str:
return "; ".join(f"{c.name}={c.value}" for c in jar)

def parse_cookie_header_and_set(session: requests.Session, cookie_header: str):
if not cookie_header:
return
parts = [p.strip() for p in cookie_header.split(";") if "=" in p]
for part in parts:
try:
k, v = part.split("=", 1)
session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/")
except Exception:
continue

============================== PARSERS ========================================

def extract_text_pieces(event: Any) -> List[str]:
out: List[str] = []
if isinstance(event, dict):
for k in ("text_new", "response", "delta", "text", "partial_response"):
v = event.get(k)
if isinstance(v, str) and v:
out.append(v)
for v in event.values():
if isinstance(v, (dict, list)):
out.extend(extract_text_pieces(v))
elif isinstance(event, list):
for x in event:
out.extend(extract_text_pieces(x))
return out

def looks_complete(node: Dict[str, Any]) -> bool:
state = (node.get("state") or node.get("message_state") or "").lower()
return state in {"complete", "completed", "stopped", "aborted"}

def is_bot_message(msg: Dict[str, Any]) -> bool:
"""Heurística robusta p/ diferenciar bot vs usuário."""
# 1) author
a = msg.get("author")
if isinstance(a, dict):
handle = str(a.get("handle") or "").lower()
if handle and handle == str(BOT_NAME).lower():
return True
if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"):
return True
t = str(a.get("type") or a.get("role") or "").lower()
if t in ("bot", "assistant", "ai"):
return True
if a.get("isBot") is True:
return True

text
# 2) "bot" b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(BOT_NAME).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False

============================== BOOTSTRAP ======================================

async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], prompt_for_capture: str) -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]:
"""Abre a página logada, intercepta send/subs, sniffa WS/SSE e captura frames enviados pelo cliente. NÃO fecha o Playwright aqui."""
p = await async_playwright().start()
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()

text
# session cookies add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) # Hooks JS await context.add_init_script("""

(() => {
const OW = window.WebSocket;
if (OW) {
window.WebSocket = new Proxy(OW, {
construct(target, args) {
try { window.POE_LAST_WS_URL = String(args?.[0] ?? ''); } catch(_) {}
const sock = new target(...args);
const _send = sock.send;
try {
sock.send = function(data) {
try {
window.POE_WS_SENT = window.POE_WS_SENT || [];
let payload = data;
if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]";
window.POE_WS_SENT.push(String(payload));
} catch (e) {}
return send.call(this, data);
};
} catch (e) {}
return sock;
}
});
}
const OE = window.EventSource;
if (OE) {
window.EventSource = new Proxy(OE, {
construct(target, args) {
try { window.POE_LAST_ES_URL = String(args?.[0] ?? ''); } catch(
) {}
return new target(...args);
}
});
}
})();
""")

text
page = await context.new_page() captured: Dict[str, Optional[str]] = { # sendMessageMutation "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, # subscriptionsMutation "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, # metas "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, # stream "ws_url": None, "es_url": None, # frames "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # typing the prompt try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(prompt_for_capture) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(prompt_for_capture) await page.keyboard.press("Enter") except Exception: pass # waiting for the capture for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) # Hooks JS + frames sent by the client try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles

async def cleanup_playwright(handles: Dict[str, Any]):
try:
if handles.get("context"):
await handles["context"].close()
except Exception:
pass
try:
if handles.get("browser"):
await handles["browser"].close()
except Exception:
pass
try:
if handles.get("p"):
await handles["p"].stop()
except Exception:
pass

============================== STREAM LISTENERS ================================

async def listen_via_websocket_url(ws_url: str,
session: requests.Session,
target_chat_id: Optional[str],
meta_headers: Dict[str, str],
initial_client_frames: List[str],
timeout_seconds: int = 120) -> str:
"""Conecta no ws_url, envia frames iniciais (se houver), desenvelopa 'messages[]' e ESCUTA APENAS O BOT."""
def _iter_inner_frames(packet: Any):
# packet can be str/dict; send with "messages": [ "<json>", ... ]
try:
if isinstance(packet, (bytes, bytearray)):
packet = packet.decode("utf-8", "ignore")
if isinstance(packet, str):
packet = json.loads(packet)
if not isinstance(packet, dict):
return
except Exception:
return

text
msgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": BASE_HEADERS["User-Agent"], "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) print(f"[WS] >> {f[:80] + ('…' if len(f) > 80 else '')}") except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break got_update = False # unwrap and process try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue # >>>>>>> filter: only bot messages if not is_bot_message(msg): # first time, log light (key to identify author) # (not too verbose) continue # <<<<<<< pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full

def _build_stream_headers(session: requests.Session) -> Dict[str, str]:
return {
"Origin": "https://poe.com",
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"User-Agent": BASE_HEADERS["User-Agent"],
"Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Cookie": cookie_header_from_jar(session.cookies),
}

def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str]) -> Tuple[bool, str, Dict[str, Any]]:
def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]:
nodes = []
for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"):
node = d.get(key)
if isinstance(node, dict):
nodes.append(node)
if not nodes and isinstance(d, dict):
for v in d.values():
if isinstance(v, dict) and ("text_new" in v or "message" in v):
nodes.append(v)
return nodes

text
try: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {}

async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], timeout_seconds: int = 120) -> str:
"""Conecta via SSE no es_url e ESCUTA (apenas BOT)."""
headers = _build_stream_headers(session)
headers["Accept"] = "text/event-stream"
full = ""
with session.get(es_url, headers=headers, stream=True) as r:
r.raise_for_status()
import time
end_at = time.time() + timeout_seconds
for raw in r.iter_lines(decode_unicode=True):
if time.time() > end_at:
print("\n[SSE] Timeout sem eventos. Verifique tchannel/channel.")
break
if not raw or raw.startswith(":"):
continue
if not raw.startswith("data:"):
continue
line = raw[5:].strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id)
if ok and chunk:
print(chunk, end="", flush=True)
full += chunk
if msg and looks_complete(msg):
print("\n--- Resposta Completa (SSE) ---")
break
return full

============================== DIAGNOSTICS ====================================

def summarize_payload(payload_base: Dict[str, Any]) -> str:
vars_obj = (payload_base or {}).get("variables") or {}
exts_obj = (payload_base or {}).get("extensions") or {}
qn = (payload_base or {}).get("queryName")
sdid = vars_obj.get("sdid")
bot = vars_obj.get("bot")
h = exts_obj.get("hash")
return f"qn:{qn} bot:{str(bot)[:12]}… sdid:{str(sdid)[:12]}… ext_hash:{str(h)[:8]}…"

================================= MAIN ========================================

async def main():
session = requests.Session()
session.headers.update(BASE_HEADERS)

text
# Base cookies session.cookies.update({"p-b": P_B_COOKIE}) if P_LAT_COOKIE: session.cookies.set("p-lat", P_LAT_COOKIE, domain="poe.com", path="/") if CF_CLEARANCE: session.cookies.set("cf_clearance", CF_CLEARANCE, domain="poe.com", path="/") if CF_BM: session.cookies.set("__cf_bm", CF_BM, domain="poe.com", path="/") if P_SB: session.cookies.set("p-sb", P_SB, domain="poe.com", path="/") info = None handles = None try: info, handles = await get_bootstrap( BOT_NAME, {"p-b": P_B_COOKIE, "p-lat": P_LAT_COOKIE, "cf_clearance": CF_CLEARANCE, "__cf_bm": CF_BM, "p-sb": P_SB}, PROMPT, ) # Always use the tchannel from the browser tch = info.get("tchannel_hdr") if tch: session.headers["poe-tchannel"] = tch # (Optional) subscriptionsMutation replay subs_raw = info.get("subs_payload_raw") subs_hdrs = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} subs_cookie_hdr = info.get("subs_cookie_header") or "" if subs_cookie_hdr: parse_cookie_header_and_set(session, subs_cookie_hdr) if subs_raw: print("Registrando subscriptions via subscriptionsMutation (replay 1:1)…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in subs_hdrs.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" resp_subs = session.post(POE_API_URL, data=subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() else: print("Aviso: não capturei subscriptionsMutation; seguirei com stream apenas (pode já estar registrado).") # =========== SEND MESSAGE (replay 1:1) =========== send_raw = info.get("send_payload_raw") send_hdrs = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} send_cookie_hdr = info.get("send_cookie_header") or "" if send_cookie_hdr: parse_cookie_header_and_set(session, send_cookie_hdr) if not send_raw: raise RuntimeError("Não capturei a sendMessageMutation (body RAW).") print("Enviando a pergunta via GraphQL com payload/headers CAPTURADOS…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in send_hdrs.items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" if "poe-tchannel" in session.headers: gql_headers_send["poe-tchannel"] = session.headers["poe-tchannel"] resp = session.post(POE_API_URL, data=send_raw, headers=gql_headers_send) resp.raise_for_status() data = resp.json() root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: raise RuntimeError("Resposta sem objeto 'chat' esperado (messageCreate/messageEdgeCreate).") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") print(f"Chat criado! ID: {chat_id}") # =========== STREAM LISTEN =========== ws_url = info.get("ws_url") or "" es_url = info.get("es_url") or "" if (not es_url) and ws_url.startswith("wss://"): es_url = "https://" + ws_url[len("wss://"):] if not (ws_url or es_url): raise RuntimeError("Não capturei WS *nem* SSE (updates?channel=…). Revise os hooks.") meta_headers = {} if tch: meta_headers["poe-tchannel"] = tch if info.get("formkey"): meta_headers["poe-formkey"] = info.get("formkey") if info.get("revision"):meta_headers["poe-revision"] = info.get("revision") if info.get("tag_id"): meta_headers["poe-tag-id"] = info.get("tag_id") if ws_url: print(f"Conectando via WebSocket: {ws_url}") final_answer = await listen_via_websocket_url( ws_url, session, str(chat_id), meta_headers=meta_headers, initial_client_frames=info.get("ws_client_frames") or [], timeout_seconds=120, ) if not final_answer and es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) else: print(f"Conectando via SSE: {es_url}") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) print("\nRESPOSTA FINAL:") print(final_answer if final_answer else "(vazio)") except requests.HTTPError as e: body = getattr(e.response, "text", "")[:2000] print(f"HTTPError: {e}\nBody: {body}") except Exception as e: print(f"Erro: {e}") finally: if handles: await cleanup_playwright(handles)

if name == "main":
asyncio.run(main())

Question

Adicione suporte a muitos cookies p_b para que seja usado multiplas contas, daí caso algum cookie falhe, ele use o outro, e no console adicione chat ao inves de ser um unico prompt hardcoded o usuario pode conversar via CLI. E caso, durante a conversa, o cookie acabar os créditos, mude para outro dinamicamente:
import asyncio
import json
from typing import Optional, Dict, Any, List, Tuple

import requests
import websockets
from playwright.async_api import async_playwright

==============================================================================

0) CONFIGURATIONS AND COOKIES

==============================================================================

P_B_COOKIE = "" # obrigatório (cookie de sessão p-b)
P_LAT_COOKIE = "" # opcional
CF_CLEARANCE = "" # opcional (Cloudflare)
CF_BM = "" # opcional
P_SB = "" # opcional

BOT_NAME = "GPT-5" # slug/handle visível na URL
PROMPT = "Hello World" # mensagem a enviar

POE_URL_ROOT = "https://poe.com"
POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST"

BASE_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept": "/",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Origin": POE_URL_ROOT,
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/json",
"poegraphql": "0",
}

============================== UTILS =========================================

def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str:
return "; ".join(f"{c.name}={c.value}" for c in jar)

def parse_cookie_header_and_set(session: requests.Session, cookie_header: str):
if not cookie_header:
return
parts = [p.strip() for p in cookie_header.split(";") if "=" in p]
for part in parts:
try:
k, v = part.split("=", 1)
session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/")
except Exception:
continue

============================== PARSERS ========================================

def extract_text_pieces(event: Any) -> List[str]:
out: List[str] = []
if isinstance(event, dict):
for k in ("text_new", "response", "delta", "text", "partial_response"):
v = event.get(k)
if isinstance(v, str) and v:
out.append(v)
for v in event.values():
if isinstance(v, (dict, list)):
out.extend(extract_text_pieces(v))
elif isinstance(event, list):
for x in event:
out.extend(extract_text_pieces(x))
return out

def looks_complete(node: Dict[str, Any]) -> bool:
state = (node.get("state") or node.get("message_state") or "").lower()
return state in {"complete", "completed", "stopped", "aborted"}

def is_bot_message(msg: Dict[str, Any]) -> bool:
"""Heurística robusta p/ diferenciar bot vs usuário."""
# 1) author
a = msg.get("author")
if isinstance(a, dict):
handle = str(a.get("handle") or "").lower()
if handle and handle == str(BOT_NAME).lower():
return True
if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"):
return True
t = str(a.get("type") or a.get("role") or "").lower()
if t in ("bot", "assistant", "ai"):
return True
if a.get("isBot") is True:
return True

text
# 2) "bot" b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(BOT_NAME).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False

============================== BOOTSTRAP ======================================

async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], prompt_for_capture: str) -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]:
"""Abre a página logada, intercepta send/subs, sniffa WS/SSE e captura frames enviados pelo cliente. NÃO fecha o Playwright aqui."""
p = await async_playwright().start()
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()

text
# session cookies add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) # Hooks JS await context.add_init_script("""

(() => {
const OW = window.WebSocket;
if (OW) {
window.WebSocket = new Proxy(OW, {
construct(target, args) {
try { window.POE_LAST_WS_URL = String(args?.[0] ?? ''); } catch(_) {}
const sock = new target(...args);
const _send = sock.send;
try {
sock.send = function(data) {
try {
window.POE_WS_SENT = window.POE_WS_SENT || [];
let payload = data;
if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]";
window.POE_WS_SENT.push(String(payload));
} catch (e) {}
return send.call(this, data);
};
} catch (e) {}
return sock;
}
});
}
const OE = window.EventSource;
if (OE) {
window.EventSource = new Proxy(OE, {
construct(target, args) {
try { window.POE_LAST_ES_URL = String(args?.[0] ?? ''); } catch(
) {}
return new target(...args);
}
});
}
})();
""")

text
page = await context.new_page() captured: Dict[str, Optional[str]] = { # sendMessageMutation "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, # subscriptionsMutation "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, # metas "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, # stream "ws_url": None, "es_url": None, # frames "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # typing the prompt try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(prompt_for_capture) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(prompt_for_capture) await page.keyboard.press("Enter") except Exception: pass # waiting for the capture for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) # Hooks JS + frames sent by the client try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles

async def cleanup_playwright(handles: Dict[str, Any]):
try:
if handles.get("context"):
await handles["context"].close()
except Exception:
pass
try:
if handles.get("browser"):
await handles["browser"].close()
except Exception:
pass
try:
if handles.get("p"):
await handles["p"].stop()
except Exception:
pass

============================== STREAM LISTENERS ================================

async def listen_via_websocket_url(ws_url: str,
session: requests.Session,
target_chat_id: Optional[str],
meta_headers: Dict[str, str],
initial_client_frames: List[str],
timeout_seconds: int = 120) -> str:
"""Conecta no ws_url, envia frames iniciais (se houver), desenvelopa 'messages[]' e ESCUTA APENAS O BOT."""
def _iter_inner_frames(packet: Any):
# packet can be str/dict; send with "messages": [ "<json>", ... ]
try:
if isinstance(packet, (bytes, bytearray)):
packet = packet.decode("utf-8", "ignore")
if isinstance(packet, str):
packet = json.loads(packet)
if not isinstance(packet, dict):
return
except Exception:
return

text
msgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": BASE_HEADERS["User-Agent"], "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) print(f"[WS] >> {f[:80] + ('…' if len(f) > 80 else '')}") except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos. Verifique tchannel/channel.") break got_update = False # unwrap and process try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue # >>>>>>> filter: only bot messages if not is_bot_message(msg): # first time, log light (key to identify author) # (not too verbose) continue # <<<<<<< pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full

def _build_stream_headers(session: requests.Session) -> Dict[str, str]:
return {
"Origin": "https://poe.com",
"Referer": f"{POE_URL_ROOT}/{BOT_NAME}",
"User-Agent": BASE_HEADERS["User-Agent"],
"Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Cookie": cookie_header_from_jar(session.cookies),
}

def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str]) -> Tuple[bool, str, Dict[str, Any]]:
def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]:
nodes = []
for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"):
node = d.get(key)
if isinstance(node, dict):
nodes.append(node)
if not nodes and isinstance(d, dict):
for v in d.values():
if isinstance(v, dict) and ("text_new" in v or "message" in v):
nodes.append(v)
return nodes

text
try: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {}

async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], timeout_seconds: int = 120) -> str:
"""Conecta via SSE no es_url e ESCUTA (apenas BOT)."""
headers = _build_stream_headers(session)
headers["Accept"] = "text/event-stream"
full = ""
with session.get(es_url, headers=headers, stream=True) as r:
r.raise_for_status()
import time
end_at = time.time() + timeout_seconds
for raw in r.iter_lines(decode_unicode=True):
if time.time() > end_at:
print("\n[SSE] Timeout sem eventos. Verifique tchannel/channel.")
break
if not raw or raw.startswith(":"):
continue
if not raw.startswith("data:"):
continue
line = raw[5:].strip()
if not line:
continue
try:
payload = json.loads(line)
except Exception:
continue
ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id)
if ok and chunk:
print(chunk, end="", flush=True)
full += chunk
if msg and looks_complete(msg):
print("\n--- Resposta Completa (SSE) ---")
break
return full

============================== DIAGNOSTICS ====================================

def summarize_payload(payload_base: Dict[str, Any]) -> str:
vars_obj = (payload_base or {}).get("variables") or {}
exts_obj = (payload_base or {}).get("extensions") or {}
qn = (payload_base or {}).get("queryName")
sdid = vars_obj.get("sdid")
bot = vars_obj.get("bot")
h = exts_obj.get("hash")
return f"qn:{qn} bot:{str(bot)[:12]}… sdid:{str(sdid)[:12]}… ext_hash:{str(h)[:8]}…"

================================= MAIN ========================================

async def main():
session = requests.Session()
session.headers.update(BASE_HEADERS)

text
# Base cookies session.cookies.update({"p-b": P_B_COOKIE}) if P_LAT_COOKIE: session.cookies.set("p-lat", P_LAT_COOKIE, domain="poe.com", path="/") if CF_CLEARANCE: session.cookies.set("cf_clearance", CF_CLEARANCE, domain="poe.com", path="/") if CF_BM: session.cookies.set("__cf_bm", CF_BM, domain="poe.com", path="/") if P_SB: session.cookies.set("p-sb", P_SB, domain="poe.com", path="/") info = None handles = None try: info, handles = await get_bootstrap( BOT_NAME, {"p-b": P_B_COOKIE, "p-lat": P_LAT_COOKIE, "cf_clearance": CF_CLEARANCE, "__cf_bm": CF_BM, "p-sb": P_SB}, PROMPT, ) # Always use the tchannel from the browser tch = info.get("tchannel_hdr") if tch: session.headers["poe-tchannel"] = tch # (Optional) subscriptionsMutation replay subs_raw = info.get("subs_payload_raw") subs_hdrs = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} subs_cookie_hdr = info.get("subs_cookie_header") or "" if subs_cookie_hdr: parse_cookie_header_and_set(session, subs_cookie_hdr) if subs_raw: print("Registrando subscriptions via subscriptionsMutation (replay 1:1)…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in subs_hdrs.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" resp_subs = session.post(POE_API_URL, data=subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() else: print("Aviso: não capturei subscriptionsMutation; seguirei com stream apenas (pode já estar registrado).") # =========== SEND MESSAGE (replay 1:1) =========== send_raw = info.get("send_payload_raw") send_hdrs = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} send_cookie_hdr = info.get("send_cookie_header") or "" if send_cookie_hdr: parse_cookie_header_and_set(session, send_cookie_hdr) if not send_raw: raise RuntimeError("Não capturei a sendMessageMutation (body RAW).") print("Enviando a pergunta via GraphQL com payload/headers CAPTURADOS…") blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in send_hdrs.items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{BOT_NAME}" if "poe-tchannel" in session.headers: gql_headers_send["poe-tchannel"] = session.headers["poe-tchannel"] resp = session.post(POE_API_URL, data=send_raw, headers=gql_headers_send) resp.raise_for_status() data = resp.json() root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: raise RuntimeError("Resposta sem objeto 'chat' esperado (messageCreate/messageEdgeCreate).") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") print(f"Chat criado! ID: {chat_id}") # =========== STREAM LISTEN =========== ws_url = info.get("ws_url") or "" es_url = info.get("es_url") or "" if (not es_url) and ws_url.startswith("wss://"): es_url = "https://" + ws_url[len("wss://"):] if not (ws_url or es_url): raise RuntimeError("Não capturei WS *nem* SSE (updates?channel=…). Revise os hooks.") meta_headers = {} if tch: meta_headers["poe-tchannel"] = tch if info.get("formkey"): meta_headers["poe-formkey"] = info.get("formkey") if info.get("revision"):meta_headers["poe-revision"] = info.get("revision") if info.get("tag_id"): meta_headers["poe-tag-id"] = info.get("tag_id") if ws_url: print(f"Conectando via WebSocket: {ws_url}") final_answer = await listen_via_websocket_url( ws_url, session, str(chat_id), meta_headers=meta_headers, initial_client_frames=info.get("ws_client_frames") or [], timeout_seconds=120, ) if not final_answer and es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) else: print(f"Conectando via SSE: {es_url}") final_answer = await listen_via_sse_url(es_url, session, str(chat_id), timeout_seconds=120) print("\nRESPOSTA FINAL:") print(final_answer if final_answer else "(vazio)") except requests.HTTPError as e: body = getattr(e.response, "text", "")[:2000] print(f"HTTPError: {e}\nBody: {body}") except Exception as e: print(f"Erro: {e}") finally: if handles: await cleanup_playwright(handles)

if name == "main":
asyncio.run(main())

Réflexion

Considering safety and code disclaimer

I need to provide a code snippet for multi-cookie p_b rotation with an interactive CLI chat. The code needs to handle dynamic credit switching, but I must be careful about safety. To be safe, I should include a disclaimer reminding users that scraping can potentially violate terms of service, even if the code itself doesn’t facilitate wrongdoing.

Implementing dynamic switching and chat loop

I think I need to create a way to switch cookies mid-stream if I'm handling a situation where credits are exhausted. For that, I’ll work on a chat loop with asyncio that ensures ongoing communication, but I need to keep the payload consistent per session. I might need to manage bootstrap more carefully, capturing it at the start with a dummy prompt. It’s tricky to handle dynamically while maintaining efficiency!

Creating message update function and error handling

I want to implement a function to update the 'message' in the send payload. I’ll replace variables like 'message' and 'query' with the user's text. I’ll also enable cookie rotation and error detection for account switching. This will involve an asyncio loop for the CLI input to keep things responsive. The code will handle

Thought for 25s

Réponse

legalzinho aviso rápido: automatizar acesso com cookies e “scraping” pode violar os Termos do Poe/Cloudflare. Use só em contas suas e por sua conta e risco.

abaixo vai um refactor que adiciona:

  • pool de múltiplos cookies p-b (e afins) com rotação automática;
  • chat interativo via CLI (REPL) em vez de prompt fixo;
  • failover dinâmico durante a conversa (troca de conta ao detectar “sem créditos”, 401/403/429 etc., e reenvia a mesma mensagem).

como usar

  1. Preencha ACCOUNTS com quantas contas quiser (mínimo o p-b).
  2. Rode: python poe_cli.py
  3. Escreva suas mensagens; :bot gpt-4o troca o bot “ao vivo”; :quit sai.

python
import asyncio import json import sys from dataclasses import dataclass, field from typing import Optional, Dict, Any, List, Tuple import requests import websockets from playwright.async_api import async_playwright # ============================================================================== # CONFIG # ============================================================================== POE_URL_ROOT = "https://poe.com" POE_API_URL = f"{POE_URL_ROOT}/api/gql_POST" DEFAULT_BOT = "GPT-5" # pode trocar via comando :bot <handle> USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0" ) # --- MULTI-CONTA: adicione aqui suas contas. p-b é obrigatório. ACCOUNTS: List[Dict[str, str]] = [ {"p-b": "PB_COOKIE_1", "p-lat": "", "cf_clearance": "", "__cf_bm": "", "p-sb": ""}, {"p-b": "PB_COOKIE_2", "p-lat": "", "cf_clearance": "", "__cf_bm": "", "p-sb": ""}, # ... quantas quiser ] BASE_HEADERS = { "User-Agent": USER_AGENT, "Accept": "*/*", "Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8", "Origin": POE_URL_ROOT, "Sec-Ch-Ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Content-Type": "application/json", "poegraphql": "0", } # ============================================================================== # UTILS # ============================================================================== def cookie_header_from_jar(jar: requests.cookies.RequestsCookieJar) -> str: return "; ".join(f"{c.name}={c.value}" for c in jar) def parse_cookie_header_and_set(session: requests.Session, cookie_header: str): if not cookie_header: return parts = [p.strip() for p in cookie_header.split(";") if "=" in p] for part in parts: try: k, v = part.split("=", 1) session.cookies.set(k.strip(), v.strip(), domain="poe.com", path="/") except Exception: continue def extract_text_pieces(event: Any) -> List[str]: out: List[str] = [] if isinstance(event, dict): for k in ("text_new", "response", "delta", "text", "partial_response"): v = event.get(k) if isinstance(v, str) and v: out.append(v) for v in event.values(): if isinstance(v, (dict, list)): out.extend(extract_text_pieces(v)) elif isinstance(event, list): for x in event: out.extend(extract_text_pieces(x)) return out def looks_complete(node: Dict[str, Any]) -> bool: state = (node.get("state") or node.get("message_state") or "").lower() return state in {"complete", "completed", "stopped", "aborted"} def is_bot_message(msg: Dict[str, Any], bot_handle: str) -> bool: a = msg.get("author") if isinstance(a, dict): handle = str(a.get("handle") or "").lower() if handle and handle == str(bot_handle).lower(): return True if a.get("__typename") in ("Bot", "PoeBot", "DefaultBot"): return True t = str(a.get("type") or a.get("role") or "").lower() if t in ("bot", "assistant", "ai"): return True if a.get("isBot") is True: return True b = msg.get("bot") if isinstance(b, dict): h = str(b.get("handle") or "").lower() if h and h == str(bot_handle).lower(): return True return True mt = str(msg.get("messageType") or msg.get("type") or "").lower() if mt in ("bot", "assistant", "ai"): return True if "suggestedReplies" in msg: return True if ("text_new" in msg or "partial_response" in msg or "delta" in msg): st = str(msg.get("state") or "").lower() if st in ("incomplete", "generating", "in_progress", "streaming", "pending", ""): return True return False # ============================================================================== # COOKIES POOL # ============================================================================== @dataclass class AccountCtx: cookies: Dict[str, str] session: Optional[requests.Session] = None formkey: Optional[str] = None revision: Optional[str] = None tag_id: Optional[str] = None tchannel_hdr: Optional[str] = None ws_url: Optional[str] = None es_url: Optional[str] = None ws_client_frames: List[str] = field(default_factory=list) send_payload_template: Optional[Dict[str, Any]] = None send_headers_template: Dict[str, str] = field(default_factory=dict) subs_raw: Optional[str] = None subs_headers_template: Dict[str, str] = field(default_factory=dict) healthy: bool = True class CookiePool: def __init__(self, accounts: List[Dict[str, str]]): self.ctxs = [AccountCtx(cookies=a) for a in accounts if a.get("p-b")] if not self.ctxs: raise RuntimeError("Nenhuma conta válida (p-b obrigatório).") self.idx = 0 def current(self) -> AccountCtx: return self.ctxs[self.idx] def mark_bad_and_rotate(self, reason: str = "") -> AccountCtx: bad = self.current() bad.healthy = False print(f"\n[POOL] Conta atual marcada como indisponível ({reason}). Alternando…") for _ in range(len(self.ctxs)): self.idx = (self.idx + 1) % len(self.ctxs) if self.ctxs[self.idx].healthy: print(f"[POOL] Nova conta selecionada (idx={self.idx}).") return self.ctxs[self.idx] # se todas ruins, reseta e tenta todas de novo for c in self.ctxs: c.healthy = True print("[POOL] Todas marcadas ruins. Reset de saúde e usando próxima.") self.idx = (self.idx + 1) % len(self.ctxs) return self.ctxs[self.idx] # ============================================================================== # BOOTSTRAP VIA PLAYWRIGHT # ============================================================================== async def get_bootstrap(bot_name: str, cookie_dict: Dict[str, str], primer_prompt: str = "hi") -> Tuple[Dict[str, Optional[str]], Dict[str, Any]]: p = await async_playwright().start() browser = await p.chromium.launch(headless=True) context = await browser.new_context() add = [] for name in ("p-b", "p-lat", "cf_clearance", "__cf_bm", "p-sb"): val = cookie_dict.get(name) if val: add.append({"name": name, "value": val, "domain": "poe.com", "path": "/", "secure": True}) if add: await context.add_cookies(add) await context.add_init_script(""" (() => { const OW = window.WebSocket; if (OW) { window.WebSocket = new Proxy(OW, { construct(target, args) { try { window.__POE_LAST_WS_URL__ = String(args?.[0] ?? ''); } catch(_) {} const sock = new target(...args); const _send = sock.send; try { sock.send = function(data) { try { window.__POE_WS_SENT__ = window.__POE_WS_SENT__ || []; let payload = data; if (data instanceof ArrayBuffer) payload = "[binary:" + String(data.byteLength||0) + "]"; window.__POE_WS_SENT__.push(String(payload)); } catch (e) {} return _send.call(this, data); }; } catch (e) {} return sock; } }); } const OE = window.EventSource; if (OE) { window.EventSource = new Proxy(OE, { construct(target, args) { try { window.__POE_LAST_ES_URL__ = String(args?.[0] ?? ''); } catch(_) {} return new target(...args); } }); } })(); """) page = await context.new_page() captured: Dict[str, Optional[str]] = { "send_payload_raw": None, "send_payload_dict": None, "send_req_headers": {}, "send_cookie_header": None, "subs_payload_raw": None, "subs_payload_dict": None, "subs_req_headers": {}, "subs_cookie_header": None, "formkey_hdr": None, "revision_hdr": None, "tag_id_hdr": None, "tchannel_hdr": None, "ws_url": None, "es_url": None, "ws_client_frames": [], } def maybe_set_updates_url(u: str): if not u: return if ("updates" in u) and ("channel=" in u): if u.startswith("ws"): captured["ws_url"] = u else: captured["es_url"] = u context.on("websocket", lambda ws: maybe_set_updates_url((ws.url or ""))) def on_req(req): try: if getattr(req, "resource_type", "") == "eventsource": maybe_set_updates_url(req.url) else: if "updates" in (req.url or "") and "channel=" in (req.url or ""): maybe_set_updates_url(req.url) except Exception: pass context.on("request", on_req) page.on("request", on_req) async def route_handler(route, request): try: if request.url.endswith("/api/gql_POST"): hdrs = {k.lower(): v for k, v in request.headers.items()} body = request.post_data or "" qn = hdrs.get("poe-queryname", "") if "poe-formkey" in hdrs: captured["formkey_hdr"] = hdrs["poe-formkey"] if "poe-revision" in hdrs: captured["revision_hdr"] = hdrs["poe-revision"] if "poe-tchannel" in hdrs: captured["tchannel_hdr"] = hdrs["poe-tchannel"] if "poe-tag-id" in hdrs: captured["tag_id_hdr"] = hdrs["poe-tag-id"] if qn == "sendMessageMutation" or "sendMessageMutation" in (body or ""): captured["send_req_headers"] = {**(captured["send_req_headers"] or {}), **hdrs} captured["send_payload_raw"] = body if "cookie" in hdrs: captured["send_cookie_header"] = hdrs["cookie"] try: captured["send_payload_dict"] = json.loads(body) except Exception: pass await route.fulfill(status=200, content_type="application/json", body='{"data":{"messageCreate":null}}') return if qn == "subscriptionsMutation" or '"queryName":"subscriptionsMutation"' in (body or ""): captured["subs_req_headers"] = {**(captured["subs_req_headers"] or {}), **hdrs} captured["subs_payload_raw"] = body if "cookie" in hdrs: captured["subs_cookie_header"] = hdrs["cookie"] try: captured["subs_payload_dict"] = json.loads(body) except Exception: pass await route.continue_() return except Exception: pass await route.continue_() await context.route("**/api/gql_POST", route_handler) await page.goto(f"{POE_URL_ROOT}/{bot_name}", wait_until="domcontentloaded") # Digita um primer pequeno só para capturar os payloads/headers try: locator = page.locator("textarea") if await locator.count() > 0: ta = locator.first await ta.click() await ta.fill(primer_prompt) else: ed = page.locator('div[contenteditable="true"]').first await ed.click() await ed.fill(primer_prompt) await page.keyboard.press("Enter") except Exception: pass for _ in range(120): got_send = bool(captured["send_payload_raw"]) got_subs = bool(captured["subs_payload_raw"]) got_stream = bool(captured["ws_url"] or captured["es_url"]) if got_send and (got_subs or got_stream): break await page.wait_for_timeout(200) try: last = await page.evaluate("""({ ws: window.__POE_LAST_WS_URL__||null, es: window.__POE_LAST_ES_URL__||null, frames: (window.__POE_WS_SENT__||[]).slice(-10) })""") if last: if not (captured["ws_url"] or captured["es_url"]): if last.get("ws"): maybe_set_updates_url(last.get("ws")) if last.get("es"): maybe_set_updates_url(last.get("es")) frames = last.get("frames") or [] captured["ws_client_frames"] = [f for f in frames if isinstance(f, str) and not f.startswith("[")] except Exception: pass info = { "send_payload_raw": captured["send_payload_raw"], "send_payload_dict": captured["send_payload_dict"], "send_req_headers": captured["send_req_headers"], "send_cookie_header": captured["send_cookie_header"], "subs_payload_raw": captured["subs_payload_raw"], "subs_payload_dict": captured["subs_payload_dict"], "subs_req_headers": captured["subs_req_headers"], "subs_cookie_header": captured["subs_cookie_header"], "formkey": captured["formkey_hdr"], "revision": captured["revision_hdr"], "tag_id": captured["tag_id_hdr"], "tchannel_hdr": captured["tchannel_hdr"], "ws_url": captured["ws_url"], "es_url": captured["es_url"], "ws_client_frames": captured["ws_client_frames"], } handles = {"p": p, "browser": browser, "context": context, "page": page} if not info["send_payload_raw"]: await cleanup_playwright(handles) raise RuntimeError("Falha: não capturei a sendMessageMutation (body RAW).") return info, handles async def cleanup_playwright(handles: Dict[str, Any]): try: if handles.get("context"): await handles["context"].close() except Exception: pass try: if handles.get("browser"): await handles["browser"].close() except Exception: pass try: if handles.get("p"): await handles["p"].stop() except Exception: pass # ============================================================================== # STREAM LISTENERS # ============================================================================== async def listen_via_websocket_url(ws_url: str, session: requests.Session, target_chat_id: Optional[str], meta_headers: Dict[str, str], initial_client_frames: List[str], bot_handle: str, timeout_seconds: int = 120) -> Tuple[str, bool]: def _iter_inner_frames(packet: Any): try: if isinstance(packet, (bytes, bytearray)): packet = packet.decode("utf-8", "ignore") if isinstance(packet, str): packet = json.loads(packet) if not isinstance(packet, dict): return except Exception: return msgs = packet.get("messages") if isinstance(msgs, list) and msgs: for m in msgs: try: if isinstance(m, (bytes, bytearray)): m = m.decode("utf-8", "ignore") if isinstance(m, str): m = json.loads(m) if isinstance(m, dict): yield m except Exception: continue else: yield packet def _iter_subscription_updates(frame: Dict[str, Any]): mtype = frame.get("mtype") or frame.get("message_type") or "" if mtype != "subscriptionUpdate": return inner = frame.get("payload") or {} if isinstance(inner.get("payloads"), list): for pl in inner["payloads"]: yield pl.get("data") or {} return if isinstance(inner.get("data"), dict): yield inner["data"] return inner2 = inner.get("payload") if isinstance(inner2, dict) and isinstance(inner2.get("data"), dict): yield inner2["data"] return def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated", "messageAdded", "messageUpdated", "messageCreated", "messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes ws_headers = { "Origin": "https://poe.com", "User-Agent": USER_AGENT, "Cookie": cookie_header_from_jar(session.cookies), "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", } for k, v in (meta_headers or {}).items(): if v: ws_headers[k] = v extra = list(ws_headers.items()) full = "" depleted = False print("[WS] Conectando…") async with websockets.connect(ws_url, extra_headers=extra, max_size=None) as ws: print("[WS] Conectado. Enviando frames de handshake do cliente (se houver)…") for f in initial_client_frames or []: try: await ws.send(f) except Exception: pass print("[WS] Aguardando eventos…") deadline = asyncio.get_event_loop().time() + timeout_seconds while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: print("\n[WS] Timeout sem eventos.") break try: message = await asyncio.wait_for(ws.recv(), timeout=remaining) except asyncio.TimeoutError: print("\n[WS] Timeout sem eventos.") break got_update = False try: candidates = list(_iter_inner_frames(message)) except Exception: candidates = [] for inner in candidates: mtype = inner.get("mtype") or inner.get("message_type") or "unknown" if mtype != "subscriptionUpdate": # Heurística: mensagens de erro gerais text = json.dumps(inner, ensure_ascii=False) if "credit" in text.lower() or "trial" in text.lower(): depleted = True continue for data_obj in _iter_subscription_updates(inner): got_update = True for node in _extract_nodes(data_obj): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg, bot_handle): continue pieces = extract_text_pieces(msg) if pieces: chunk = "".join(pieces) print(chunk, end="", flush=True) full += chunk if looks_complete(msg) or looks_complete(node): print("\n--- Resposta Completa (WS) ---") return full, depleted if got_update: deadline = asyncio.get_event_loop().time() + timeout_seconds return full, depleted def _build_stream_headers(session: requests.Session, bot_handle: str) -> Dict[str, str]: return { "Origin": "https://poe.com", "Referer": f"{POE_URL_ROOT}/{bot_handle}", "User-Agent": USER_AGENT, "Accept-Language": BASE_HEADERS.get("Accept-Language", "en-US,en;q=0.9"), "Cache-Control": "no-cache", "Pragma": "no-cache", "Cookie": cookie_header_from_jar(session.cookies), } def _matches_chat_and_chunk(payload: Dict[str, Any], target_chat_id: Optional[str], bot_handle: str) -> Tuple[bool, str, Dict[str, Any]]: def _extract_nodes(d: Dict[str, Any]) -> List[Dict[str, Any]]: nodes = [] for key in ("messageTextUpdated","messageAdded","messageUpdated","messageCreated","messageStateUpdated"): node = d.get(key) if isinstance(node, dict): nodes.append(node) if not nodes and isinstance(d, dict): for v in d.values(): if isinstance(v, dict) and ("text_new" in v or "message" in v): nodes.append(v) return nodes try: mtype = payload.get("mtype") or payload.get("message_type") if mtype != "subscriptionUpdate": return False, "", {} inner = payload.get("payload") or {} datas: List[Dict[str, Any]] = [] if isinstance(inner.get("payloads"), list): datas = [pl.get("data") or {} for pl in inner["payloads"]] elif isinstance(inner.get("data"), dict): datas = [inner["data"]] elif isinstance(inner.get("payload"), dict) and isinstance(inner["payload"].get("data"), dict): datas = [inner["payload"]["data"]] for d in datas: for node in _extract_nodes(d): msg = node.get("message") if isinstance(node.get("message"), dict) else node chat_id_here = str(msg.get("chatId") or msg.get("chat_id") or "") if target_chat_id and chat_id_here and str(target_chat_id) != chat_id_here: continue if not is_bot_message(msg, bot_handle): continue pieces = extract_text_pieces(msg) return True, ("".join(pieces) if pieces else ""), msg except Exception: pass return False, "", {} async def listen_via_sse_url(es_url: str, session: requests.Session, target_chat_id: Optional[str], bot_handle: str, timeout_seconds: int = 120) -> Tuple[str, bool]: headers = _build_stream_headers(session, bot_handle) headers["Accept"] = "text/event-stream" full = "" depleted = False with session.get(es_url, headers=headers, stream=True) as r: try: r.raise_for_status() except requests.HTTPError as e: if e.response is not None and e.response.status_code in (401,403,429,402): depleted = True raise import time end_at = time.time() + timeout_seconds for raw in r.iter_lines(decode_unicode=True): if time.time() > end_at: print("\n[SSE] Timeout sem eventos.") break if not raw or raw.startswith(":"): continue if not raw.startswith("data:"): continue line = raw[5:].strip() if not line: continue try: payload = json.loads(line) except Exception: continue ok, chunk, msg = _matches_chat_and_chunk(payload, target_chat_id, bot_handle) # Heurística: mensagens de “sem crédito” if not ok: low = line.lower() if any(k in low for k in ("credit", "quota", "trial", "limit", "payment")): depleted = True if ok and chunk: print(chunk, end="", flush=True) full += chunk if msg and looks_complete(msg): print("\n--- Resposta Completa (SSE) ---") break return full, depleted # ============================================================================== # PAYLOAD MUTATION # ============================================================================== def patch_send_payload_text(payload: Dict[str, Any], new_text: str, bot_handle: str) -> str: """Tenta substituir o texto da mensagem no payload capturado e bot alvo. Retorna o corpo JSON pronto para envio (str).""" cloned = json.loads(json.dumps(payload)) # Campos que geralmente carregam o texto: paths = [ ("variables", "query"), ("variables", "message"), ("variables", "text"), ("variables", "input"), ] applied = False for p in paths: cur = cloned ok = True for k in p[:-1]: cur = cur.get(k, {}) if not isinstance(cur, dict): ok = False break if ok and isinstance(cur, dict) and p[-1] in cur: cur[p[-1]] = new_text applied = True # força bot/handle se existir no template for key in ("bot", "handle", "botHandle"): if "variables" in cloned and isinstance(cloned["variables"], dict) and key in cloned["variables"]: cloned["variables"][key] = bot_handle applied = True if not applied: # fallback: se não sabe onde por, cria "query" cloned.setdefault("variables", {}) cloned["variables"]["query"] = new_text cloned["variables"]["bot"] = bot_handle return json.dumps(cloned, ensure_ascii=False) # ============================================================================== # REQUEST SENDER (com failover) # ============================================================================== def apply_base_cookies(session: requests.Session, cks: Dict[str, str]): session.cookies.update({"p-b": cks.get("p-b", "")}) if cks.get("p-lat"): session.cookies.set("p-lat", cks["p-lat"], domain="poe.com", path="/") if cks.get("cf_clearance"): session.cookies.set("cf_clearance", cks["cf_clearance"], domain="poe.com", path="/") if cks.get("__cf_bm"): session.cookies.set("__cf_bm", cks["__cf_bm"], domain="poe.com", path="/") if cks.get("p-sb"): session.cookies.set("p-sb", cks["p-sb"], domain="poe.com", path="/") def http_maybe_depleted(resp: requests.Response, body_text: str) -> bool: if resp.status_code in (401,403,429,402): return True low = body_text.lower() if body_text else "" return any(k in low for k in ("credit", "quota", "trial", "limit", "payment", "subscribe")) async def bootstrap_if_needed(ctx: AccountCtx, bot_handle: str): if ctx.session is None: s = requests.Session() s.headers.update(BASE_HEADERS | {"Referer": f"{POE_URL_ROOT}/{bot_handle}"}) apply_base_cookies(s, ctx.cookies) ctx.session = s if ctx.send_payload_template and ctx.ws_url: return info, handles = await get_bootstrap(bot_handle, ctx.cookies, primer_prompt=".") try: # subs re-jogo (opcional) ctx.subs_raw = info.get("subs_payload_raw") ctx.subs_headers_template = {k.lower(): v for k, v in (info.get("subs_req_headers") or {}).items()} if info.get("subs_cookie_header"): parse_cookie_header_and_set(ctx.session, info["subs_cookie_header"]) if ctx.subs_raw: blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_subs = {k: v for k, v in ctx.subs_headers_template.items() if k not in blocked} gql_headers_subs["content-type"] = "application/json" gql_headers_subs["origin"] = POE_URL_ROOT gql_headers_subs["referer"] = f"{POE_URL_ROOT}/{bot_handle}" resp_subs = ctx.session.post(POE_API_URL, data=ctx.subs_raw, headers=gql_headers_subs) resp_subs.raise_for_status() # payload base ctx.send_payload_template = info.get("send_payload_dict") ctx.send_headers_template = {k.lower(): v for k, v in (info.get("send_req_headers") or {}).items()} if info.get("send_cookie_header"): parse_cookie_header_and_set(ctx.session, info["send_cookie_header"]) ctx.formkey = info.get("formkey") ctx.revision= info.get("revision") ctx.tag_id = info.get("tag_id") ctx.tchannel_hdr = info.get("tchannel_hdr") ctx.ws_url = info.get("ws_url") ctx.es_url = info.get("es_url") or ("https://" + info.get("ws_url","")[len("wss://"):] if info.get("ws_url","").startswith("wss://") else None) ctx.ws_client_frames = info.get("ws_client_frames") or [] finally: await cleanup_playwright(handles) async def send_and_stream(cookie_pool: CookiePool, text: str, bot_handle: str) -> None: """Envia a mensagem com a conta atual; se esgotar créditos/erro, troca de conta e reenvia.""" tried = 0 last_error = None while tried < len(cookie_pool.ctxs) * 2: # dá duas voltas se preciso tried += 1 ctx = cookie_pool.current() try: await bootstrap_if_needed(ctx, bot_handle) # headers para send blocked = {"content-length", "host", "connection", "accept-encoding"} gql_headers_send = {k: v for k, v in (ctx.send_headers_template or {}).items() if k not in blocked} gql_headers_send["content-type"] = "application/json" gql_headers_send["origin"] = POE_URL_ROOT gql_headers_send["referer"] = f"{POE_URL_ROOT}/{bot_handle}" if ctx.tchannel_hdr: ctx.session.headers["poe-tchannel"] = ctx.tchannel_hdr gql_headers_send["poe-tchannel"] = ctx.tchannel_hdr body = patch_send_payload_text(ctx.send_payload_template or {}, text, bot_handle) resp = ctx.session.post(POE_API_URL, data=body, headers=gql_headers_send) depleted = False try: resp.raise_for_status() data = resp.json() except requests.HTTPError as e: bt = getattr(e.response, "text", "")[:2000] depleted = http_maybe_depleted(e.response, bt) raise except Exception: # se não é JSON, pode ser bloqueio depleted = http_maybe_depleted(resp, resp.text[:2000] if hasattr(resp, "text") else "") raise root = data.get("data", {}) if isinstance(data, dict) else {} chat = None if "messageCreate" in root: chat = (root.get("messageCreate") or {}).get("chat") or {} elif "messageEdgeCreate" in root: chat = (root.get("messageEdgeCreate") or {}).get("chat") or {} if not chat: # checar erros graphql text_blob = json.dumps(data, ensure_ascii=False).lower() if any(k in text_blob for k in ("credit", "quota", "trial", "limit", "payment")): depleted = True raise RuntimeError("Resposta sem objeto 'chat' esperado.") chat_id = chat.get("chatId") or chat.get("id") if not chat_id: raise RuntimeError("Não consegui obter o chatId.") # stream meta_headers = {} if ctx.tchannel_hdr: meta_headers["poe-tchannel"] = ctx.tchannel_hdr if ctx.formkey: meta_headers["poe-formkey"] = ctx.formkey if ctx.revision: meta_headers["poe-revision"] = ctx.revision if ctx.tag_id: meta_headers["poe-tag-id"] = ctx.tag_id print(f"[stream] usando conta idx={cookie_pool.idx} | bot={bot_handle}") if ctx.ws_url: ans, dep = await listen_via_websocket_url( ctx.ws_url, ctx.session, str(chat_id), meta_headers, ctx.ws_client_frames, bot_handle, timeout_seconds=180 ) depleted = depleted or dep if not ans and ctx.es_url: print("\n[WS] Sem payload do bot. Tentando SSE…") ans2, dep2 = await listen_via_sse_url(ctx.es_url, ctx.session, str(chat_id), bot_handle, timeout_seconds=180) depleted = depleted or dep2 else: if not ctx.es_url: raise RuntimeError("Sem WS nem SSE capturados.") ans2, depleted = await listen_via_sse_url(ctx.es_url, ctx.session, str(chat_id), bot_handle, timeout_seconds=180) if depleted: cookie_pool.mark_bad_and_rotate("créditos esgotados/bloqueio") return # sucesso (ou ao menos tentou stream) except Exception as e: last_error = e msg = str(e) # heurística: se parecer quota/credit/rate limit, troca low = msg.lower() if any(k in low for k in ("credit", "quota", "trial", "limit", "payment")): cookie_pool.mark_bad_and_rotate("indício de falta de créditos") else: # HTTP status checados? if "429" in low or "403" in low or "401" in low or "402" in low: cookie_pool.mark_bad_and_rotate("http auth/rate") else: print(f"[warn] erro na conta idx={cookie_pool.idx}: {e}") # tenta mesma conta mais uma vez; se falhar, troca cookie_pool.mark_bad_and_rotate("erro genérico") continue raise RuntimeError(f"Falha após múltiplas tentativas/failovers. Último erro: {last_error}") # ============================================================================== # CLI REPL # ============================================================================== HELP = """\ Comandos: :bot <handle> -> troca o bot (ex: :bot gpt-4o) :quit -> sair :help -> ajuda """ async def ainput(prompt: str = "") -> str: return await asyncio.to_thread(input, prompt) async def repl(): pool = CookiePool(ACCOUNTS) bot_handle = DEFAULT_BOT print("Poe CLI com múltiplas contas p-b.") print(HELP) while True: try: user = (await ainput("\nVocê> ")).strip() except (EOFError, KeyboardInterrupt): print("\nbye!") return if not user: continue if user.startswith(":"): parts = user[1:].split() if not parts: print(HELP) continue cmd = parts[0].lower() if cmd == "quit": print("bye!") return if cmd == "help": print(HELP) continue if cmd == "bot": if len(parts) < 2: print("Uso: :bot <handle>") continue bot_handle = parts[1] print(f"[ok] bot -> {bot_handle}") # reset parcial: deixa cada ctx re-bootstrapar no novo bot quando necessário for ctx in pool.ctxs: ctx.ws_url = None ctx.es_url = None ctx.send_payload_template = None continue print("Comando desconhecido.") continue try: await send_and_stream(pool, user, bot_handle) except Exception as e: print(f"\n[erro] {e}") # ============================================================================== # MAIN # ============================================================================== if __name__ == "__main__": try: asyncio.run(repl()) except KeyboardInterrupt: print("\nbye!")

notas de implementação

  • Pool e failover: CookiePool marca a conta atual como “ruim” quando recebe sinais fortes de quota/403/429/402, ou quando a própria resposta/stream contém termos tipo “credit/quota/trial/limit”. Em seguida rota para a próxima conta. Se todas ficarem ruins, ele “reseta” o estado de saúde e roda de novo.
  • Bootstrap por conta: cada AccountCtx faz um bootstrap preguiçoso com o get_bootstrap() apenas quando necessário (ou quando você troca o bot com :bot); isso captura sendPayload, subs, tchannel, formkey, ws/es URL e frames de handshake.
  • Chat “de verdade”: no REPL, cada mensagem do usuário vira um replay do sendMessageMutation com o texto trocado por patch_send_payload_text(). Ele tenta as chaves comuns (variables.query, variables.message etc.) e força o handle do bot no payload.
  • Troca de bot ao vivo: :bot <handle> limpa caches de WS/SSE/payload, de modo que cada conta reboote no novo handle quando precisar.
  • SSE/WS: escuta só mensagens do bot, concatena deltas e detecta término com looks_complete(); se o WS não render, cai para SSE.

Se quiser, eu também posso adicionar:

  • persistência do histórico localmente (JSONL);
  • logs estruturados;
  • backoff exponencial suave antes de rotacionar;
  • suporte a proxy/playwright.connect_over_cdp;
  • comando :sys <texto> para mandar system prompts, se o payload do Poe estiver expondo o campo.

Partager cette Q&R