@/sequence_analyzer.py Quero que se a cor que em `...
Criado em: 27 de maio de 2025
Criado em: 27 de maio de 2025
@/sequence_analyzer.py Quero que se a cor que em Provavel Próxima
foi sugerida, bater com o resultado da api, esse novo resultado que for adicionado em Sequencia atual
deverá ter a cor que já está atualmente, agora se não bater com o resultado da api, esse novo resultado que for adicionado em Sequencia atual
deverá ter a mesma cor só que um pouco mais escuro
(mande o código completo sem omitir linha no formato markdown)
python# filepath: c:\\Users\\lucas\\Downloads\\-dou\\sequence_analyzer.py # O arquivo data.csv segue esse formato: # # ``` # color,total_red,total_white,total_black,total_red_bets_placed,total_white_bets_placed,total_black_bets_placed # Red 4,266.27,1399.03,305.06,128,796,146 # Black 14,350.55,1564.81,570.30,117,861,188 # Black 14,149.74,1366.68,736.29,83,766,156 # ... # ``` # # - O total_red, total_black e total_white é o valor $ total da respectiva rodada de cada cor. # - O total_red_bets_placed, total_black_bets_placed e o total_white_bets_placed é a quantidade total de players que apostaram na rodada de cada cor. # import tkinter as tk import csv # requests # Removido, pois get_current_result será substituído import threading import time from typing import Optional, Dict, List, Tuple, Any import socketio # Adicionado import asyncio # Adicionado import queue # Adicionado # Constantes para o cliente WebSocket (inspirado em blaze_bet_tracker.py) AUTH_TOKEN_FROM_TRACKER = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ.Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8" ROOM_TO_SUBSCRIBE = "double_room_1" BLAZE_API_URL = "wss://api-gaming.blaze.bet.br" SOCKETIO_PATH = "/replication/" # Classe para o cliente WebSocket class BlazeRealtimeClient: def __init__(self, token: str, data_queue: queue.Queue, stop_event: threading.Event): self.token = token self.data_queue = data_queue self.stop_event = stop_event self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.loop: Optional[asyncio.AbstractEventLoop] = None self.is_authenticated = False self.is_subscribed = False # Registrar handlers de eventos do socketio self.sio.on('connect', self.on_sio_connect) self.sio.on('disconnect', self.on_sio_disconnect) self.sio.on('data', self.on_sio_data) self.sio.on('connect_error', self.on_sio_connect_error) async def on_sio_connect(self): print("Realtime: WebSocket conectado. Autenticando...") self.is_authenticated = False self.is_subscribed = False auth_payload = {"token": self.token} try: await self.sio.emit("cmd", {"id": "authenticate", "payload": auth_payload}, callback=self._handle_auth_ack) except Exception as e: print(f"Realtime: Erro ao emitir autenticação: {e}") self.stop_event.set() # Sinaliza para parar se a emissão falhar async def _handle_auth_ack(self, ack_data: Any): if ack_data == "ok" or (isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok"): print("Realtime: Autenticado com sucesso. Inscrevendo-se na sala...") self.is_authenticated = True subscribe_payload = {"room": ROOM_TO_SUBSCRIBE} try: await self.sio.emit("cmd", {"id": "subscribe", "payload": subscribe_payload}, callback=self._handle_subscribe_ack) except Exception as e: print(f"Realtime: Erro ao emitir inscrição: {e}") self.stop_event.set() else: print(f"Realtime: Falha na autenticação WebSocket: {ack_data}. O token pode ser inválido ou expirado.") self.is_authenticated = False self.stop_event.set() async def _handle_subscribe_ack(self, ack_data: Any): if ack_data == "ok" or (isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok"): print(f"Realtime: Inscrito com sucesso na sala {ROOM_TO_SUBSCRIBE}.") self.is_subscribed = True else: print(f"Realtime: Falha ao inscrever-se na sala {ROOM_TO_SUBSCRIBE}: {ack_data}") self.is_subscribed = False self.stop_event.set() async def on_sio_disconnect(self): print("Realtime: WebSocket desconectado.") self.is_authenticated = False self.is_subscribed = False # A lógica de reconexão está no main_ws_loop async def on_sio_connect_error(self, data: Any): print(f"Realtime: Erro de conexão WebSocket: {data}") # Isso pode levar a uma desconexão, e o main_ws_loop tentará reconectar. async def on_sio_data(self, data_message: Dict): if not (isinstance(data_message, dict) and data_message.get("id") == "double.tick"): return payload = data_message.get("payload") if not (payload and isinstance(payload, dict)): return status = payload.get("status") created_at = payload.get("created_at") # Timestamp da rodada if status in ["finished", "complete"] and created_at: numeric_color_api = payload.get("color") # 0: white, 1: red, 2: black roll_api = payload.get("roll") # Usar os valores em EUR para determinar M/m, conforme blaze_bet_tracker.py total_red_eur = payload.get("total_red_eur_bet", 0.0) total_black_eur = payload.get("total_black_eur_bet", 0.0) game_color_str: Optional[str] = None if numeric_color_api == 0: game_color_str = "White" elif numeric_color_api == 1: game_color_str = "Red" elif numeric_color_api == 2: game_color_str = "Black" detail_str: Optional[str] = None if game_color_str == "Red": if total_red_eur > total_black_eur: detail_str = "M" elif total_red_eur < total_black_eur: detail_str = "m" elif game_color_str == "Black": if total_black_eur > total_red_eur: detail_str = "M" elif total_black_eur < total_red_eur: detail_str = "m" if game_color_str: # Certifica-se que a cor foi determinada result = { "color": game_color_str, "detail": detail_str, "roll": roll_api, "created_at": created_at, "status": status, } self.data_queue.put(result) async def main_ws_loop(self): while not self.stop_event.is_set(): if not self.sio.connected: if self.stop_event.is_set(): break # Verifica antes de tentar conectar print("Realtime: Tentando conectar WebSocket...") try: await self.sio.connect(BLAZE_API_URL, transports=['websocket'], socketio_path=SOCKETIO_PATH) # on_sio_connect cuidará da autenticação e inscrição após a conexão # Aguarda um pouco para dar tempo para conectar e autenticar/inscrever for _ in range(50): # Espera até 5s por autenticação/inscrição if self.is_authenticated and self.is_subscribed: break if self.stop_event.is_set(): break await asyncio.sleep(0.1) if not (self.is_authenticated and self.is_subscribed) and not self.stop_event.is_set(): print("Realtime: Timeout esperando por autenticação/inscrição. Desconectando para tentar novamente.") if self.sio.connected: await self.sio.disconnect() except socketio.exceptions.ConnectionError as e: print(f"Realtime: Falha na tentativa de conexão: {e}. Tentando novamente em 5s.") if self.stop_event.is_set(): break await asyncio.sleep(5) except Exception as e: print(f"Realtime: Erro inesperado na conexão: {e}. Tentando novamente em 5s.") if self.stop_event.is_set(): break await asyncio.sleep(5) continue # Volta ao início do loop para verificar stop_event e status da conexão if self.stop_event.is_set(): break await asyncio.sleep(0.1) # Mantém o loop ativo, permitindo a verificação do stop_event if self.sio.connected: print("Realtime: Desconectando WebSocket devido ao evento de parada.") await self.sio.disconnect() print("Realtime: Loop principal do cliente WebSocket encerrado.") def start(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self.main_ws_loop()) except Exception as e: print(f"Realtime: Erro ao iniciar cliente: {e}") finally: # Garante que o loop seja fechado corretamente if self.loop and self.sio.connected: # Tenta desconectar se ainda conectado self.loop.run_until_complete(self.sio.disconnect()) if self.loop and not self.loop.is_closed(): self.loop.close() print("Realtime: Loop de eventos do cliente WebSocket totalmente fechado.") # Remover a função get_current_result antiga # def get_current_result() -> Optional[Dict]: # ... (código antigo removido) ... class SequenceCounterApp: def __init__(self, master, csv_filepath): self.master = master master.title("Contador de Sequências de Cores") # Dark Theme Colors self.dark_bg = "#2E2E2E" self.text_fg = "#E0E0E0" self.text_bold_fg = "white" self.frame_bg = self.dark_bg self.button_bg_std = "#4A4A4A" self.button_fg_std = self.text_fg self.red_accent_fg = "#FF6B6B" self.black_accent_fg = "#C0C0C0" self.white_accent_fg = "#E0E0E0" # These colors are used directly in button creation, not as instance variables red_button_bg = "#B80000" red_button_fg = "white" black_button_bg = "#333333" black_button_fg = "white" white_button_bg = "#555555" white_button_fg = "white" master.configure(bg=self.dark_bg) self.csv_filepath = csv_filepath self.winning_colors_from_csv: List[Tuple[str, Optional[str]]] = [] # Stores (color, detail) self._load_winning_colors() # Load data at init if not self.winning_colors_from_csv: error_label = tk.Label(master, text=f"Erro ao carregar dados de {csv_filepath}\\nVerifique o arquivo ou o caminho.", fg=self.red_accent_fg, bg=self.dark_bg) error_label.pack() self.disable_buttons = True else: self.disable_buttons = False self.current_sequence: List[Tuple[str, Optional[str]]] = [] self.full_history_sequence: List[Tuple[str, Optional[str]]] = [] # Stores all added elements # Não há seleção de modo, o modo "Detalhado" é o padrão. # self.mode_var = tk.StringVar(value="detailed") # Removido # Elementos da UI self.info_label = tk.Label(master, text="Clique nos botões para formar uma sequência:", bg=self.dark_bg, fg=self.text_fg) self.info_label.pack(pady=5) current_sequence_info_frame = tk.Frame(master, bg=self.frame_bg) current_sequence_info_frame.pack(pady=2) self.sequence_display_label = tk.Label(current_sequence_info_frame, text="Sequência atual:", bg=self.frame_bg, fg=self.text_fg) self.sequence_display_label.pack(side=tk.LEFT, padx=(0, 5)) self.sequence_label_text = tk.StringVar() self.sequence_label_text.set("Nenhuma") self.sequence_label = tk.Label(current_sequence_info_frame, textvariable=self.sequence_label_text, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg) self.sequence_label.pack(side=tk.LEFT, padx=(0, 10)) self.probable_next_color_desc_label = tk.Label(current_sequence_info_frame, text="Provável Próxima:", bg=self.frame_bg, fg=self.text_fg) self.probable_next_color_desc_label.pack(side=tk.LEFT, padx=(10, 5)) self.probable_next_color_text_var = tk.StringVar() self.probable_next_color_text_var.set("-") self.probable_next_color_label = tk.Label(current_sequence_info_frame, textvariable=self.probable_next_color_text_var, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg) self.probable_next_color_label.pack(side=tk.LEFT) self.count_display_label = tk.Label(master, text="Ocorrências na base de dados (para Sequência Atual):", bg=self.dark_bg, fg=self.text_fg) self.count_display_label.pack() self.count_label_text = tk.StringVar() self.count_label_text.set("0") self.count_label = tk.Label(master, textvariable=self.count_label_text, font=("Arial", 12, "bold"), bg=self.dark_bg, fg=self.text_bold_fg) self.count_label.pack(pady=5) self.next_color_title_label = tk.Label(master, text="Ocorrências da Próxima Cor Após Sequência:", bg=self.dark_bg, fg=self.text_fg) self.next_color_title_label.pack(pady=(10,0)) # Frame for next color counts (dynamic content) self.next_color_display_frame = tk.Frame(master, bg=self.frame_bg) self.next_color_display_frame.pack(pady=5) self._setup_next_color_labels() # Initialize labels # Button Frames self.button_container_frame = tk.Frame(master, bg=self.frame_bg) self.button_container_frame.pack(pady=10) # Apenas um frame para botões detalhados self.detailed_button_frame = tk.Frame(self.button_container_frame, bg=self.frame_bg) self.detailed_button_frame.pack(pady=5) # Sempre visível # Detailed Buttons (agora são os únicos botões) self.red_M_button = tk.Button(self.detailed_button_frame, text="Red-M", command=lambda: self._add_to_sequence("Red", "M"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.red_M_button.pack(side=tk.LEFT, padx=3) self.red_m_button = tk.Button(self.detailed_button_frame, text="Red-m", command=lambda: self._add_to_sequence("Red", "m"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.red_m_button.pack(side=tk.LEFT, padx=3) self.black_M_button = tk.Button(self.detailed_button_frame, text="Black-M", command=lambda: self._add_to_sequence("Black", "M"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.black_M_button.pack(side=tk.LEFT, padx=3) self.black_m_button = tk.Button(self.detailed_button_frame, text="Black-m", command=lambda: self._add_to_sequence("Black", "m"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.black_m_button.pack(side=tk.LEFT, padx=3) # White Button (agora só existe uma versão) self.white_button_detailed = tk.Button(self.detailed_button_frame, text="White", command=lambda: self._add_to_sequence("White", None), bg=white_button_bg, fg=white_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.white_button_detailed.pack(side=tk.LEFT, padx=3) self.reset_button = tk.Button(master, text="Resetar Sequência", command=self._reset_sequence, width=15, height=2, bg=self.button_bg_std, fg=self.button_fg_std, relief=tk.FLAT, borderwidth=0) self.reset_button.pack(pady=10) # Input para o limite máximo de sequências self.max_sequence_limit = tk.IntVar(value=4) # Default value self.max_sequence_limit_label = tk.Label(master, text="Limite Máximo de Sequências (1-10):", bg=self.dark_bg, fg=self.text_fg) self.max_sequence_limit_label.pack(pady=(10,0)) self.max_sequence_limit_spinbox = tk.Spinbox(master, from_=1, to=10, textvariable=self.max_sequence_limit, width=5, bg=self.button_bg_std, fg=self.button_fg_std, command=self._on_max_sequence_limit_change) # Update display on change self.max_sequence_limit_spinbox.pack(pady=5) # Labels for Casa Analysis self.casa_analysis_title_label = tk.Label(master, text="Análise \'Casas\' (Próximas Cores para Sub-Sequências Atuais):", font=("Arial", 10, "bold"), bg=self.dark_bg, fg=self.text_fg) self.casa_analysis_title_label.pack(pady=(10,0)) self.casa_labels_frames = [] # To store frames for each casa's counts self.casa_labels_vars = [] # The actual creation of casa labels will happen in _setup_casa_labels, # which is called by _on_mode_change. self._on_mode_change() # Set initial button visibility and disable state # Variáveis e configuração para modo de atualização em tempo real self.stop_event = threading.Event() self.last_timestamp = "1970-01-01T00:00:00.000Z" # Initialize to a very old date self.real_time_var = tk.BooleanVar() self.real_time_var.set(False) # Adicionar para o cliente WebSocket self.auth_token = AUTH_TOKEN_FROM_TRACKER # Usar o token definido globalmente self.real_time_data_queue = queue.Queue() self.real_time_client: Optional[BlazeRealtimeClient] = None self.real_time_thread: Optional[threading.Thread] = None real_time_cb = tk.Checkbutton(master, text="Real-Time API", variable=self.real_time_var, command=self._toggle_real_time_mode, bg=self.dark_bg, fg=self.text_fg, selectcolor=self.frame_bg) real_time_cb.pack(pady=5) # self._update_display() # Initial display update - Now handled by _on_mode_change -> _reset_sequence def _on_max_sequence_limit_change(self): """Called when the max sequence limit spinbox value changes.""" self._setup_casa_labels() # Rebuild casa labels with the new limit new_max_limit = self.max_sequence_limit.get() # Update current_sequence to be a slice of the full history, limited by the new max_limit self.current_sequence = self.full_history_sequence[-new_max_limit:] self._update_display() # Update the display to reflect new analysis def _format_sequence_element(self, color: str, detail: Optional[str]) -> str: if detail: return f"{color}-{detail}" return color def _on_mode_change(self): # Não há mais seleção de modo, então apenas atualiza estados e labels self._update_button_states() self._setup_next_color_labels() # Rebuild next color labels for the new mode self._setup_casa_labels() # Rebuild casa labels for the new mode self._reset_sequence() # Resets current sequence and updates display def _update_button_states(self): # Sempre opera no modo detalhado is_disabled = self.disable_buttons self.detailed_button_frame.pack(pady=5) # self.classic_button_frame.pack_forget() # classic_button_frame não existe mais self.red_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.red_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.white_button_detailed.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.reset_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) def _setup_next_color_labels(self): # Clear existing labels in the frame for widget in self.next_color_display_frame.winfo_children(): widget.destroy() # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg # Sempre configura para o modo detalhado self.next_red_M_label_text = tk.StringVar(value="Red-M: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_red_M_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_red_m_label_text = tk.StringVar(value="Red-m: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_red_m_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_black_M_label_text = tk.StringVar(value="Black-M: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_black_M_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_black_m_label_text = tk.StringVar(value="Black-m: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_black_m_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_white_label_text = tk.StringVar(value="White: 0") # Re-use for White tk.Label(self.next_color_display_frame, textvariable=self.next_white_label_text, font=("Arial", 10), fg=white_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) def _setup_casa_labels(self): # Clear existing frames and vars for frame in self.casa_labels_frames: frame.destroy() self.casa_labels_frames = [] self.casa_labels_vars = [] # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg max_limit = self.max_sequence_limit.get() for n_casa in range(1, max_limit + 1): casa_outer_frame = tk.Frame(self.master, bg=frame_bg) casa_outer_frame.pack(pady=2, fill=tk.X) self.casa_labels_frames.append(casa_outer_frame) seq_var = tk.StringVar() seq_var.set(f"Casa {n_casa} (Seq: ...):") seq_label = tk.Label(casa_outer_frame, textvariable=seq_var, font=("Arial", 9), bg=frame_bg, fg=self.text_fg) seq_label.pack(side=tk.TOP, anchor="w", padx=10) counts_frame = tk.Frame(casa_outer_frame, bg=frame_bg) counts_frame.pack(side=tk.TOP, anchor="w", padx=10) red_M_var = tk.StringVar(value="R-M: -") red_m_var = tk.StringVar(value="R-m: -") black_M_var = tk.StringVar(value="B-M: -") black_m_var = tk.StringVar(value="B-m: -") white_var_detailed = tk.StringVar(value="White: -") tk.Label(counts_frame, textvariable=red_M_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=red_m_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=black_M_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=black_m_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=white_var_detailed, font=("Arial", 9), fg=white_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) self.casa_labels_vars.append({ "seq": seq_var, "detailed_red_M": red_M_var, "detailed_red_m": red_m_var, "detailed_black_M": black_M_var, "detailed_black_m": black_m_var, "shared_white": white_var_detailed }) def _load_winning_colors(self): self.winning_colors_from_csv = [] try: with open(self.csv_filepath, mode='r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) if not reader.fieldnames or 'color' not in reader.fieldnames or \ 'total_red' not in reader.fieldnames or 'total_black' not in reader.fieldnames: print(f"Erro: Cabeçalhos necessários não encontrados em {self.csv_filepath}") return for row in reader: color_part_full = row.get('color', '') if not color_part_full: continue color_part = color_part_full.split(' ')[0] detail = None if color_part in ["Red", "Black", "White"]: if color_part != "White": # Only Red and Black can have M/m try: total_red = float(row.get('total_red', '0')) total_black = float(row.get('total_black', '0')) if color_part == "Red": if total_red > total_black: detail = "M" elif total_red < total_black: detail = "m" elif color_part == "Black": if total_black > total_red: detail = "M" elif total_black < total_red: detail = "m" except ValueError: print(f"Aviso: Valor não numérico para total_red/total_black na linha: {row}") # detail remains None self.winning_colors_from_csv.append((color_part, detail)) except FileNotFoundError: print(f"Erro: Arquivo não encontrado em {self.csv_filepath}") except Exception as e: print(f"Erro ao ler o CSV: {e}") def _get_next_colors_for_specific_sequence(self, sequence_to_find: List[Tuple[str, Optional[str]]]) -> Dict[Tuple[str, Optional[str]], int]: # Returns counts for detailed (color, detail) tuples next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not sequence_to_find or not self.winning_colors_from_csv: return next_counts len_sub = len(sequence_to_find) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i:i+len_sub] == sequence_to_find: next_color_tuple = self.winning_colors_from_csv[i+len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_casa_analysis_display(self): # mode = self.mode_var.get() # mode_var removido max_limit = self.max_sequence_limit.get() for n_idx in range(max_limit): N = n_idx + 1 # Ensure we don't try to access an index that doesn't exist if n_idx >= len(self.casa_labels_vars): break # This should not happen if _setup_casa_labels is called correctly vars_for_casa = self.casa_labels_vars[n_idx] current_seq_len = len(self.current_sequence) seq_display_text = "N/A" # Default if disabled or no data if self.disable_buttons or not self.winning_colors_from_csv: pass # seq_display_text remains N/A elif not self.current_sequence: seq_display_text = "-" elif current_seq_len < N: seq_display_text = "Insuficiente" else: # Enough elements target_sequence_for_casa = self.current_sequence[:N] sequence_str_parts = [self._format_sequence_element(c, d) for c, d in target_sequence_for_casa] sequence_str = " -> ".join(sequence_str_parts) max_display_len = 25 if len(sequence_str) > max_display_len: sequence_str = sequence_str[:max_display_len-3] + "..." seq_display_text = sequence_str vars_for_casa["seq"].set(f"Casa {N} (Sub: {seq_display_text})") if self.disable_buttons or not self.winning_colors_from_csv or current_seq_len < N: # Set all counts to "-" vars_for_casa["detailed_red_M"].set("R-M: -") vars_for_casa["detailed_red_m"].set("R-m: -") vars_for_casa["detailed_black_M"].set("B-M: -") vars_for_casa["detailed_black_m"].set("B-m: -") vars_for_casa["shared_white"].set("White: -") continue # Calculate and display counts next_color_counts_detailed = self._get_next_colors_for_specific_sequence(self.current_sequence[:N]) # Sempre no modo detalhado vars_for_casa["detailed_red_M"].set(f"R-M: {next_color_counts_detailed.get(('Red', 'M'), 0)}") vars_for_casa["detailed_red_m"].set(f"R-m: {next_color_counts_detailed.get(('Red', 'm'), 0)}") vars_for_casa["detailed_black_M"].set(f"B-M: {next_color_counts_detailed.get(('Black', 'M'), 0)}") vars_for_casa["detailed_black_m"].set(f"B-m: {next_color_counts_detailed.get(('Black', 'm'), 0)}") vars_for_casa["shared_white"].set(f"White: {next_color_counts_detailed.get(('White', None), 0)}") def _determine_most_probable_next_color(self) -> Tuple[Optional[str], Optional[str]]: # Returns (color, detail) or (None,None) if not self.current_sequence or self.disable_buttons or not self.winning_colors_from_csv: return None, None detailed_next_counts = self._count_next_color_occurrences() # Returns { (c,d): count } if not detailed_next_counts: return None, None # Sempre no modo detalhado max_occurrence = max(detailed_next_counts.values()) if max_occurrence == 0: return None, None # Get all (color, detail) tuples that have the max_occurrence probable_detailed_colors = [cd_tuple for cd_tuple, count in detailed_next_counts.items() if count == max_occurrence] if probable_detailed_colors: # Sort for consistent tie-breaking, e.g., by color then detail return sorted(probable_detailed_colors, key=lambda x: (x[0], str(x[1])))[0] return None, None def _add_to_sequence(self, color: str, detail: Optional[str]): if self.disable_buttons: return entry = (color, detail) self.full_history_sequence.append(entry) # Always keep current_sequence as a slice of the full history, limited by max_sequence_limit self.current_sequence = self.full_history_sequence[-self.max_sequence_limit.get():] self._update_display() def _reset_sequence(self): # if self.disable_buttons: return # Allow reset even if buttons are disabled by data load failure self.current_sequence = [] self.full_history_sequence = [] self._update_display() def _count_sequence_occurrences(self) -> int: if not self.current_sequence or not self.winning_colors_from_csv: return 0 count = 0 len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main < len_sub: return 0 for i in range(len_main - len_sub + 1): # Direct comparison of list of tuples if self.winning_colors_from_csv[i:i+len_sub] == self.current_sequence: count += 1 return count def _count_next_color_occurrences(self) -> Dict[Tuple[str, Optional[str]], int]: # Always returns detailed counts: { (color, detail_or_None): count } next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not self.current_sequence or not self.winning_colors_from_csv: return next_counts len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: # Not enough data for a sequence + next color return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i:i+len_sub] == self.current_sequence: next_color_tuple = self.winning_colors_from_csv[i+len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_display(self): # Sequence Label if self.current_sequence: sequence_str_parts = [self._format_sequence_element(c, d) for c, d in self.current_sequence] self.sequence_label_text.set(" -> ".join(sequence_str_parts)) else: self.sequence_label_text.set("Nenhuma") # Occurrences Count occurrences = self._count_sequence_occurrences() self.count_label_text.set(f"{occurrences}") # Next Color Counts Display detailed_next_counts = self._count_next_color_occurrences() # mode = self.mode_var.get() # mode_var removido # Sempre no modo detalhado if hasattr(self, 'next_red_M_label_text'): self.next_red_M_label_text.set(f"Red-M: {detailed_next_counts.get(('Red', 'M'), 0)}") if hasattr(self, 'next_red_m_label_text'): self.next_red_m_label_text.set(f"Red-m: {detailed_next_counts.get(('Red', 'm'), 0)}") if hasattr(self, 'next_black_M_label_text'): self.next_black_M_label_text.set(f"Black-M: {detailed_next_counts.get(('Black', 'M'), 0)}") if hasattr(self, 'next_black_m_label_text'): self.next_black_m_label_text.set(f"Black-m: {detailed_next_counts.get(('Black', 'm'), 0)}") if hasattr(self, 'next_white_label_text'): self.next_white_label_text.set(f"White: {detailed_next_counts.get(('White', None), 0)}") # Probable Next Color prob_color, prob_detail = self._determine_most_probable_next_color() # Use instance variables for colors red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg text_fg = self.text_fg if prob_color: self.probable_next_color_text_var.set(self._format_sequence_element(prob_color, prob_detail)) if prob_color == "Red": self.probable_next_color_label.config(fg=red_accent_fg) elif prob_color == "Black": self.probable_next_color_label.config(fg=black_accent_fg) elif prob_color == "White": self.probable_next_color_label.config(fg=white_accent_fg) else: self.probable_next_color_label.config(fg=text_fg) # Should not happen if prob_color is set else: self.probable_next_color_text_var.set("-") self.probable_next_color_label.config(fg=text_fg) self._update_casa_analysis_display() def _toggle_real_time_mode(self): if self.real_time_var.get(): # Se está ativando o modo real-time if not self.real_time_thread or not self.real_time_thread.is_alive(): print("Iniciando cliente Real-Time API...") # Alerta sobre o token aqui, se necessário if "1752950579" in self.auth_token: # Checa se é o token com expiração conhecida # A data atual simulada é Maio 2025, token expira Março 2025 print("ALERTA: O token de autenticação pode estar expirado. A API Real-Time pode não funcionar.") self.stop_event.clear() self.real_time_client = BlazeRealtimeClient(self.auth_token, self.real_time_data_queue, self.stop_event) self.real_time_thread = threading.Thread(target=self.real_time_client.start, daemon=True) self.real_time_thread.start() self.master.after(100, self._process_real_time_queue) # Inicia a verificação da fila else: # Se está desativando o modo real-time print("Parando cliente Real-Time API...") self.stop_event.set() # Sinaliza para a thread do cliente parar if self.real_time_thread and self.real_time_thread.is_alive(): self.real_time_thread.join(timeout=5) # Espera a thread terminar if self.real_time_thread.is_alive(): print("Realtime: Thread do cliente não terminou no tempo esperado.") self.real_time_client = None # Limpa a referência ao cliente # A função _process_real_time_queue vai parar de se re-agendar por conta própria def _process_real_time_queue(self): if not self.real_time_var.get(): # Se o modo real-time foi desativado, para de processar return try: while not self.real_time_data_queue.empty(): # Processa todos os itens na fila new_result = self.real_time_data_queue.get_nowait() if new_result["created_at"] > self.last_timestamp: self.last_timestamp = new_result["created_at"] color_from_api = new_result["color"] # Ex: "Red", "Black", "White" detail_from_api = new_result["detail"] # Ex: "M", "m", None # roll_from_api = new_result["roll"] # Disponível se precisar mostrar o número print(f"Realtime: Novo resultado recebido - Cor: {color_from_api}, Detalhe: {detail_from_api}, Roll: {new_result.get('roll')}") # Adiciona automaticamente à sequência se o modo real-time estiver ativo self._add_to_sequence(color_from_api, detail_from_api) # Aqui você pode adicionar código para atualizar algum label na UI com o último resultado, se desejar # Ex: self.live_status_label.set(f"Último: {color_from_api} {detail_from_api or ''} ({roll_from_api})") else: # Resultado antigo ou duplicado, ignorar pass except queue.Empty: pass # Fila vazia, nada a fazer except Exception as e: print(f"Erro ao processar fila de tempo real: {e}") finally: if self.real_time_var.get(): # Reagenda a verificação se o modo real-time ainda estiver ativo self.master.after(200, self._process_real_time_queue) # Verifica a fila a cada 200ms # Ponto de entrada principal da aplicação if __name__ == "__main__": root = tk.Tk() # Presume-se que data.csv está no mesmo diretório ou fornecer o caminho correto csv_file_path = "data.csv" app = SequenceCounterApp(root, csv_file_path) # Adiciona um handler para fechar a aplicação de forma limpa def on_closing(): print("Fechando aplicação...") if app.real_time_thread and app.real_time_thread.is_alive(): print("Parando thread de real-time...") app.stop_event.set() app.real_time_thread.join(timeout=5) # Espera a thread do cliente WS if app.real_time_thread.is_alive(): print("Timeout ao esperar a thread de real-time terminar.") # Garante que o loop asyncio do cliente WS seja fechado se existir e estiver rodando if app.real_time_client and app.real_time_client.loop and not app.real_time_client.loop.is_closed(): print("Fechando loop asyncio do cliente WebSocket...") # Desconectar o cliente sio se estiver conectado if app.real_time_client.sio and app.real_time_client.sio.connected: app.real_time_client.loop.run_until_complete(app.real_time_client.sio.disconnect()) app.real_time_client.loop.call_soon_threadsafe(app.real_time_client.loop.stop) # Esperar um pouco para o loop parar # A thread do cliente já deve ter terminado, então o loop deve parar ou já parou. # Se a thread ainda estiver viva, o loop.close() pode dar problema. # A thread do cliente é daemon, então ela será terminada quando o programa principal sair, # mas é melhor tentar fechar o loop de forma limpa. # A lógica de fechamento do loop já está no método start() do BlazeRealtimeClient. root.destroy() print("Aplicação fechada.") root.protocol("WM_DELETE_WINDOW", on_closing) try: root.mainloop() except KeyboardInterrupt: print("\nInterrupção pelo teclado recebida. Fechando...") on_closing()
python# filepath: c:\\Users\\lucas\\Downloads\\-dou\\sequence_analyzer.py # O arquivo data.csv segue esse formato: # # ``` # color,total_red,total_white,total_black,total_red_bets_placed,total_white_bets_placed,total_black_bets_placed # Red 4,266.27,1399.03,305.06,128,796,146 # Black 14,350.55,1564.81,570.30,117,861,188 # Black 14,149.74,1366.68,736.29,83,766,156 # ... # ``` # # - O total_red, total_black e total_white é o valor $ total da respectiva rodada de cada cor. # - O total_red_bets_placed, total_black_bets_placed e o total_white_bets_placed é a quantidade total de players que apostaram na rodada de cada cor. # import tkinter as tk import csv # requests # Removido, pois get_current_result será substituído import threading import time from typing import Optional, Dict, List, Tuple, Any import socketio # Adicionado import asyncio # Adicionado import queue # Adicionado # Constantes para o cliente WebSocket (inspirado em blaze_bet_tracker.py) AUTH_TOKEN_FROM_TRACKER = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ.Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8" ) ROOM_TO_SUBSCRIBE = "double_room_1" BLAZE_API_URL = "wss://api-gaming.blaze.bet.br" SOCKETIO_PATH = "/replication/" # Classe para o cliente WebSocket class BlazeRealtimeClient: def __init__(self, token: str, data_queue: queue.Queue, stop_event: threading.Event): self.token = token self.data_queue = data_queue self.stop_event = stop_event self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.loop: Optional[asyncio.AbstractEventLoop] = None self.is_authenticated = False self.is_subscribed = False # Registrar handlers de eventos do socketio self.sio.on("connect", self.on_sio_connect) self.sio.on("disconnect", self.on_sio_disconnect) self.sio.on("data", self.on_sio_data) self.sio.on("connect_error", self.on_sio_connect_error) async def on_sio_connect(self): print("Realtime: WebSocket conectado. Autenticando...") self.is_authenticated = False self.is_subscribed = False auth_payload = {"token": self.token} try: await self.sio.emit( "cmd", {"id": "authenticate", "payload": auth_payload}, callback=self._handle_auth_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir autenticação: {e}") self.stop_event.set() # Sinaliza para parar se a emissão falhar async def _handle_auth_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print("Realtime: Autenticado com sucesso. Inscrevendo-se na sala...") self.is_authenticated = True subscribe_payload = {"room": ROOM_TO_SUBSCRIBE} try: await self.sio.emit( "cmd", {"id": "subscribe", "payload": subscribe_payload}, callback=self._handle_subscribe_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir inscrição: {e}") self.stop_event.set() else: print( f"Realtime: Falha na autenticação WebSocket: {ack_data}. O token pode ser inválido ou expirado." ) self.is_authenticated = False self.stop_event.set() async def _handle_subscribe_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print(f"Realtime: Inscrito com sucesso na sala {ROOM_TO_SUBSCRIBE}.") self.is_subscribed = True else: print( f"Realtime: Falha ao inscrever-se na sala {ROOM_TO_SUBSCRIBE}: {ack_data}" ) self.is_subscribed = False self.stop_event.set() async def on_sio_disconnect(self): print("Realtime: WebSocket desconectado.") self.is_authenticated = False self.is_subscribed = False # A lógica de reconexão está no main_ws_loop async def on_sio_connect_error(self, data: Any): print(f"Realtime: Erro de conexão WebSocket: {data}") # Isso pode levar a uma desconexão, e o main_ws_loop tentará reconectar. async def on_sio_data(self, data_message: Dict): if not (isinstance(data_message, dict) and data_message.get("id") == "double.tick"): return payload = data_message.get("payload") if not (payload and isinstance(payload, dict)): return status = payload.get("status") created_at = payload.get("created_at") # Timestamp da rodada if status in ["finished", "complete"] and created_at: numeric_color_api = payload.get("color") # 0: white, 1: red, 2: black roll_api = payload.get("roll") # Usar os valores em EUR para determinar M/m, conforme blaze_bet_tracker.py total_red_eur = payload.get("total_red_eur_bet", 0.0) total_black_eur = payload.get("total_black_eur_bet", 0.0) game_color_str: Optional[str] = None if numeric_color_api == 0: game_color_str = "White" elif numeric_color_api == 1: game_color_str = "Red" elif numeric_color_api == 2: game_color_str = "Black" detail_str: Optional[str] = None if game_color_str == "Red": if total_red_eur > total_black_eur: detail_str = "M" elif total_red_eur < total_black_eur: detail_str = "m" elif game_color_str == "Black": if total_black_eur > total_red_eur: detail_str = "M" elif total_black_eur < total_red_eur: detail_str = "m" if game_color_str: # Certifica-se que a cor foi determinada result = { "color": game_color_str, "detail": detail_str, "roll": roll_api, "created_at": created_at, "status": status, } self.data_queue.put(result) async def main_ws_loop(self): while not self.stop_event.is_set(): if not self.sio.connected: if self.stop_event.is_set(): break # Verifica antes de tentar conectar print("Realtime: Tentando conectar WebSocket...") try: await self.sio.connect( BLAZE_API_URL, transports=["websocket"], socketio_path=SOCKETIO_PATH, ) # on_sio_connect cuidará da autenticação e inscrição após a conexão # Aguarda um pouco para dar tempo para conectar e autenticar/inscrever for _ in range(50): # Espera até 5s por autenticação/inscrição if self.is_authenticated and self.is_subscribed: break if self.stop_event.is_set(): break await asyncio.sleep(0.1) if ( not (self.is_authenticated and self.is_subscribed) and not self.stop_event.is_set() ): print( "Realtime: Timeout esperando por autenticação/inscrição. Desconectando para tentar novamente." ) if self.sio.connected: await self.sio.disconnect() except socketio.exceptions.ConnectionError as e: print( f"Realtime: Falha na tentativa de conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) except Exception as e: print( f"Realtime: Erro inesperado na conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) continue # Volta ao início do loop para verificar stop_event e status da conexão if self.stop_event.is_set(): break await asyncio.sleep( 0.1 ) # Mantém o loop ativo, permitindo a verificação do stop_event if self.sio.connected: print("Realtime: Desconectando WebSocket devido ao evento de parada.") await self.sio.disconnect() print("Realtime: Loop principal do cliente WebSocket encerrado.") def start(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self.main_ws_loop()) except Exception as e: print(f"Realtime: Erro ao iniciar cliente: {e}") finally: # Garante que o loop seja fechado corretamente if self.loop and self.sio.connected: # Tenta desconectar se ainda conectado self.loop.run_until_complete(self.sio.disconnect()) if self.loop and not self.loop.is_closed(): self.loop.close() print("Realtime: Loop de eventos do cliente WebSocket totalmente fechado.") # Remover a função get_current_result antiga # def get_current_result() -> Optional[Dict]: # ... (código antigo removido) ... class SequenceCounterApp: def __init__(self, master, csv_filepath): self.master = master master.title("Contador de Sequências de Cores") # Dark Theme Colors self.dark_bg = "#2E2E2E" self.text_fg = "#E0E0E0" self.text_bold_fg = "white" self.frame_bg = self.dark_bg self.button_bg_std = "#4A4A4A" self.button_fg_std = self.text_fg self.red_accent_fg = "#FF6B6B" self.black_accent_fg = "#C0C0C0" self.white_accent_fg = "#E0E0E0" # Nova função para calcular tons mais escuros def _darken_color(hex_color: str, factor: float = 0.7) -> str: hex_color = hex_color.lstrip("#") r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) r = int(r * factor) g = int(g * factor) b = int(b * factor) return f"#{r:02X}{g:02X}{b:02X}" # Tornando acessível em toda a classe self._darken_color = _darken_color # These colors are used directly in button creation, not as instance variables red_button_bg = "#B80000" red_button_fg = "white" black_button_bg = "#333333" black_button_fg = "white" white_button_bg = "#555555" white_button_fg = "white" master.configure(bg=self.dark_bg) self.csv_filepath = csv_filepath self.winning_colors_from_csv: List[ Tuple[str, Optional[str]] ] = [] # Stores (color, detail) self._load_winning_colors() # Load data at init if not self.winning_colors_from_csv: error_label = tk.Label( master, text=f"Erro ao carregar dados de {csv_filepath}\nVerifique o arquivo ou o caminho.", fg=self.red_accent_fg, bg=self.dark_bg, ) error_label.pack() self.disable_buttons = True else: self.disable_buttons = False self.current_sequence: List[Tuple[str, Optional[str]]] = [] self.full_history_sequence: List[ Tuple[str, Optional[str]] ] = [] # Stores all added elements # Elementos da UI self.info_label = tk.Label( master, text="Clique nos botões para formar uma sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.info_label.pack(pady=5) current_sequence_info_frame = tk.Frame(master, bg=self.frame_bg) current_sequence_info_frame.pack(pady=2) self.sequence_display_label = tk.Label( current_sequence_info_frame, text="Sequência atual:", bg=self.frame_bg, fg=self.text_fg, ) self.sequence_display_label.pack(side=tk.LEFT, padx=(0, 5)) self.sequence_label_text = tk.StringVar() self.sequence_label_text.set("Nenhuma") self.sequence_label = tk.Label( current_sequence_info_frame, textvariable=self.sequence_label_text, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) self.sequence_label.pack(side=tk.LEFT, padx=(0, 10)) self.probable_next_color_desc_label = tk.Label( current_sequence_info_frame, text="Provável Próxima:", bg=self.frame_bg, fg=self.text_fg, ) self.probable_next_color_desc_label.pack(side=tk.LEFT, padx=(10, 5)) self.probable_next_color_text_var = tk.StringVar() self.probable_next_color_text_var.set("-") self.probable_next_color_label = tk.Label( current_sequence_info_frame, textvariable=self.probable_next_color_text_var, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) self.probable_next_color_label.pack(side=tk.LEFT) self.count_display_label = tk.Label( master, text="Ocorrências na base de dados (para Sequência Atual):", bg=self.dark_bg, fg=self.text_fg, ) self.count_display_label.pack() self.count_label_text = tk.StringVar() self.count_label_text.set("0") self.count_label = tk.Label( master, textvariable=self.count_label_text, font=("Arial", 12, "bold"), bg=self.dark_bg, fg=self.text_bold_fg, ) self.count_label.pack(pady=5) self.next_color_title_label = tk.Label( master, text="Ocorrências da Próxima Cor Após Sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.next_color_title_label.pack(pady=(10, 0)) # Frame for next color counts (dynamic content) self.next_color_display_frame = tk.Frame(master, bg=self.frame_bg) self.next_color_display_frame.pack(pady=5) self._setup_next_color_labels() # Initialize labels # Button Frames self.button_container_frame = tk.Frame(master, bg=self.frame_bg) self.button_container_frame.pack(pady=10) # Apenas um frame para botões detalhados self.detailed_button_frame = tk.Frame( self.button_container_frame, bg=self.frame_bg ) self.detailed_button_frame.pack(pady=5) # Sempre visível # Detailed Buttons (agora são os únicos botões) self.red_M_button = tk.Button( self.detailed_button_frame, text="Red-M", command=lambda: self._add_to_sequence("Red", "M"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_M_button.pack(side=tk.LEFT, padx=3) self.red_m_button = tk.Button( self.detailed_button_frame, text="Red-m", command=lambda: self._add_to_sequence("Red", "m"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_m_button.pack(side=tk.LEFT, padx=3) self.black_M_button = tk.Button( self.detailed_button_frame, text="Black-M", command=lambda: self._add_to_sequence("Black", "M"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_M_button.pack(side=tk.LEFT, padx=3) self.black_m_button = tk.Button( self.detailed_button_frame, text="Black-m", command=lambda: self._add_to_sequence("Black", "m"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_m_button.pack(side=tk.LEFT, padx=3) # White Button (agora só existe uma versão) self.white_button_detailed = tk.Button( self.detailed_button_frame, text="White", command=lambda: self._add_to_sequence("White", None), bg=white_button_bg, fg=white_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.white_button_detailed.pack(side=tk.LEFT, padx=3) self.reset_button = tk.Button( master, text="Resetar Sequência", command=self._reset_sequence, width=15, height=2, bg=self.button_bg_std, fg=self.button_fg_std, relief=tk.FLAT, borderwidth=0, ) self.reset_button.pack(pady=10) # Input para o limite máximo de sequências self.max_sequence_limit = tk.IntVar(value=4) # Default value self.max_sequence_limit_label = tk.Label( master, text="Limite Máximo de Sequências (1-10):", bg=self.dark_bg, fg=self.text_fg, ) self.max_sequence_limit_label.pack(pady=(10, 0)) self.max_sequence_limit_spinbox = tk.Spinbox( master, from_=1, to=10, textvariable=self.max_sequence_limit, width=5, bg=self.button_bg_std, fg=self.button_fg_std, command=self._on_max_sequence_limit_change, ) # Update display on change self.max_sequence_limit_spinbox.pack(pady=5) # Labels for Casa Analysis self.casa_analysis_title_label = tk.Label( master, text="Análise 'Casas' (Próximas Cores para Sub-Sequências Atuais):", font=("Arial", 10, "bold"), bg=self.dark_bg, fg=self.text_fg, ) self.casa_analysis_title_label.pack(pady=(10, 0)) self.casa_labels_frames = [] # To store frames for each casa's counts self.casa_labels_vars = [] self._on_mode_change() # Inicializações dependentes de modo # Variáveis e configuração para modo de atualização em tempo real self.stop_event = threading.Event() self.last_timestamp = "1970-01-01T00:00:00.000Z" # Initialize to a very old date self.real_time_var = tk.BooleanVar() self.real_time_var.set(False) # Adicionar para o cliente WebSocket self.auth_token = AUTH_TOKEN_FROM_TRACKER # Usar o token definido globalmente self.real_time_data_queue = queue.Queue() self.real_time_client: Optional[BlazeRealtimeClient] = None self.real_time_thread: Optional[threading.Thread] = None real_time_cb = tk.Checkbutton( master, text="Real-Time API", variable=self.real_time_var, command=self._toggle_real_time_mode, bg=self.dark_bg, fg=self.text_fg, selectcolor=self.frame_bg, ) real_time_cb.pack(pady=5) # ---------- NOVOS MÉTODOS / ALTERAÇÕES PARA LÓGICA DE COR ----------- def _get_accent_for_color(self, color: str) -> str: if color == "Red": return self.red_accent_fg elif color == "Black": return self.black_accent_fg elif color == "White": return self.white_accent_fg return self.text_bold_fg # Fallback # -------------------------------------------------------------------- def _on_max_sequence_limit_change(self): """Called when the max sequence limit spinbox value changes.""" self._setup_casa_labels() # Rebuild casa labels with the new limit new_max_limit = self.max_sequence_limit.get() # Update current_sequence to be a slice of the full history, limited by the new max_limit self.current_sequence = self.full_history_sequence[-new_max_limit:] self._update_display() # Update the display to reflect new analysis def _format_sequence_element(self, color: str, detail: Optional[str]) -> str: if detail: return f"{color}-{detail}" return color def _on_mode_change(self): # Não há mais seleção de modo, então apenas atualiza estados e labels self._update_button_states() self._setup_next_color_labels() # Rebuild next color labels for the new mode self._setup_casa_labels() # Rebuild casa labels for the new mode self._reset_sequence() # Resets current sequence and updates display def _update_button_states(self): # Sempre opera no modo detalhado is_disabled = self.disable_buttons self.detailed_button_frame.pack(pady=5) self.red_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.red_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.white_button_detailed.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.reset_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) def _setup_next_color_labels(self): # Clear existing labels in the frame for widget in self.next_color_display_frame.winfo_children(): widget.destroy() # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg # Sempre configura para o modo detalhado self.next_red_M_label_text = tk.StringVar(value="Red-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_M_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_red_m_label_text = tk.StringVar(value="Red-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_m_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_M_label_text = tk.StringVar(value="Black-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_M_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_m_label_text = tk.StringVar(value="Black-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_m_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_white_label_text = tk.StringVar(value="White: 0") # Re-use for White tk.Label( self.next_color_display_frame, textvariable=self.next_white_label_text, font=("Arial", 10), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) def _setup_casa_labels(self): # Clear existing frames and vars for frame in self.casa_labels_frames: frame.destroy() self.casa_labels_frames = [] self.casa_labels_vars = [] # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg max_limit = self.max_sequence_limit.get() for n_casa in range(1, max_limit + 1): casa_outer_frame = tk.Frame(self.master, bg=frame_bg) casa_outer_frame.pack(pady=2, fill=tk.X) self.casa_labels_frames.append(casa_outer_frame) seq_var = tk.StringVar() seq_var.set(f"Casa {n_casa} (Seq: ...):") seq_label = tk.Label( casa_outer_frame, textvariable=seq_var, font=("Arial", 9), bg=frame_bg, fg=self.text_fg, ) seq_label.pack(side=tk.TOP, anchor="w", padx=10) counts_frame = tk.Frame(casa_outer_frame, bg=frame_bg) counts_frame.pack(side=tk.TOP, anchor="w", padx=10) red_M_var = tk.StringVar(value="R-M: -") red_m_var = tk.StringVar(value="R-m: -") black_M_var = tk.StringVar(value="B-M: -") black_m_var = tk.StringVar(value="B-m: -") white_var_detailed = tk.StringVar(value="White: -") tk.Label( counts_frame, textvariable=red_M_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=red_m_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_M_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_m_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=white_var_detailed, font=("Arial", 9), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) self.casa_labels_vars.append( { "seq": seq_var, "detailed_red_M": red_M_var, "detailed_red_m": red_m_var, "detailed_black_M": black_M_var, "detailed_black_m": black_m_var, "shared_white": white_var_detailed, } ) def _load_winning_colors(self): self.winning_colors_from_csv = [] try: with open( self.csv_filepath, mode="r", newline="", encoding="utf-8" ) as file: reader = csv.DictReader(file) if ( not reader.fieldnames or "color" not in reader.fieldnames or "total_red" not in reader.fieldnames or "total_black" not in reader.fieldnames ): print(f"Erro: Cabeçalhos necessários não encontrados em {self.csv_filepath}") return for row in reader: color_part_full = row.get("color", "") if not color_part_full: continue color_part = color_part_full.split(" ")[0] detail = None if color_part in ["Red", "Black", "White"]: if color_part != "White": # Only Red and Black can have M/m try: total_red = float(row.get("total_red", "0")) total_black = float(row.get("total_black", "0")) if color_part == "Red": if total_red > total_black: detail = "M" elif total_red < total_black: detail = "m" elif color_part == "Black": if total_black > total_red: detail = "M" elif total_black < total_red: detail = "m" except ValueError: print( f"Aviso: Valor não numérico para total_red/total_black na linha: {row}" ) self.winning_colors_from_csv.append((color_part, detail)) except FileNotFoundError: print(f"Erro: Arquivo não encontrado em {self.csv_filepath}") except Exception as e: print(f"Erro ao ler o CSV: {e}") # ------------------------- CASA ANALYSIS ---------------------------- def _get_next_colors_for_specific_sequence( self, sequence_to_find: List[Tuple[str, Optional[str]]] ) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not sequence_to_find or not self.winning_colors_from_csv: return next_counts len_sub = len(sequence_to_find) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == sequence_to_find: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_casa_analysis_display(self): max_limit = self.max_sequence_limit.get() for n_idx in range(max_limit): N = n_idx + 1 if n_idx >= len(self.casa_labels_vars): break vars_for_casa = self.casa_labels_vars[n_idx] current_seq_len = len(self.current_sequence) seq_display_text = "N/A" if self.disable_buttons or not self.winning_colors_from_csv: pass elif not self.current_sequence: seq_display_text = "-" elif current_seq_len < N: seq_display_text = "Insuficiente" else: target_sequence_for_casa = self.current_sequence[:N] sequence_str_parts = [ self._format_sequence_element(c, d) for c, d in target_sequence_for_casa ] sequence_str = " -> ".join(sequence_str_parts) max_display_len = 25 if len(sequence_str) > max_display_len: sequence_str = sequence_str[: max_display_len - 3] + "..." seq_display_text = sequence_str vars_for_casa["seq"].set(f"Casa {N} (Sub: {seq_display_text})") if ( self.disable_buttons or not self.winning_colors_from_csv or current_seq_len < N ): vars_for_casa["detailed_red_M"].set("R-M: -") vars_for_casa["detailed_red_m"].set("R-m: -") vars_for_casa["detailed_black_M"].set("B-M: -") vars_for_casa["detailed_black_m"].set("B-m: -") vars_for_casa["shared_white"].set("White: -") continue next_color_counts_detailed = self._get_next_colors_for_specific_sequence( self.current_sequence[:N] ) vars_for_casa["detailed_red_M"].set( f"R-M: {next_color_counts_detailed.get(('Red', 'M'), 0)}" ) vars_for_casa["detailed_red_m"].set( f"R-m: {next_color_counts_detailed.get(('Red', 'm'), 0)}" ) vars_for_casa["detailed_black_M"].set( f"B-M: {next_color_counts_detailed.get(('Black', 'M'), 0)}" ) vars_for_casa["detailed_black_m"].set( f"B-m: {next_color_counts_detailed.get(('Black', 'm'), 0)}" ) vars_for_casa["shared_white"].set( f"White: {next_color_counts_detailed.get(('White', None), 0)}" ) # ------------------- PROVÁVEL PRÓXIMA COR --------------------------- def _determine_most_probable_next_color( self, ) -> Tuple[Optional[str], Optional[str]]: if not self.current_sequence or self.disable_buttons or not self.winning_colors_from_csv: return None, None detailed_next_counts = self._count_next_color_occurrences() if not detailed_next_counts: return None, None max_occurrence = max(detailed_next_counts.values()) if max_occurrence == 0: return None, None probable_detailed_colors = [ cd_tuple for cd_tuple, count in detailed_next_counts.items() if count == max_occurrence ] if probable_detailed_colors: return sorted(probable_detailed_colors, key=lambda x: (x[0], str(x[1])))[0] return None, None # ------------------- SEQUÊNCIA E ADIÇÕES ---------------------------- def _add_to_sequence(self, color: str, detail: Optional[str]): if self.disable_buttons: return # Captura a cor prevista ANTES de adicionar (desconsidera M/m) pred_color, _ = self._determine_most_probable_next_color() # Adiciona o novo resultado entry = (color, detail) self.full_history_sequence.append(entry) self.current_sequence = self.full_history_sequence[-self.max_sequence_limit.get() :] # Decide a cor de exibição para a sequência if pred_color and pred_color == color: seq_fg = self._get_accent_for_color(color) else: seq_fg = self._darken_color(self._get_accent_for_color(color)) self.sequence_label.config(fg=seq_fg) self._update_display() def _reset_sequence(self): self.current_sequence = [] self.full_history_sequence = [] self.sequence_label.config(fg=self.text_bold_fg) # Reseta cor self._update_display() def _count_sequence_occurrences(self) -> int: if not self.current_sequence or not self.winning_colors_from_csv: return 0 count = 0 len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main < len_sub: return 0 for i in range(len_main - len_sub + 1): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: count += 1 return count def _count_next_color_occurrences(self) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not self.current_sequence or not self.winning_colors_from_csv: return next_counts len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_display(self): if self.current_sequence: sequence_str_parts = [ self._format_sequence_element(c, d) for c, d in self.current_sequence ] self.sequence_label_text.set(" -> ".join(sequence_str_parts)) else: self.sequence_label_text.set("Nenhuma") self.sequence_label.config(fg=self.text_bold_fg) # Garante reset de cor occurrences = self._count_sequence_occurrences() self.count_label_text.set(f"{occurrences}") detailed_next_counts = self._count_next_color_occurrences() if hasattr(self, "next_red_M_label_text"): self.next_red_M_label_text.set( f"Red-M: {detailed_next_counts.get(('Red', 'M'), 0)}" ) if hasattr(self, "next_red_m_label_text"): self.next_red_m_label_text.set( f"Red-m: {detailed_next_counts.get(('Red', 'm'), 0)}" ) if hasattr(self, "next_black_M_label_text"): self.next_black_M_label_text.set( f"Black-M: {detailed_next_counts.get(('Black', 'M'), 0)}" ) if hasattr(self, "next_black_m_label_text"): self.next_black_m_label_text.set( f"Black-m: {detailed_next_counts.get(('Black', 'm'), 0)}" ) if hasattr(self, "next_white_label_text"): self.next_white_label_text.set( f"White: {detailed_next_counts.get(('White', None), 0)}" ) prob_color, prob_detail = self._determine_most_probable_next_color() if prob_color: self.probable_next_color_text_var.set( self._format_sequence_element(prob_color, prob_detail) ) self.probable_next_color_label.config( fg=self._get_accent_for_color(prob_color) ) else: self.probable_next_color_text_var.set("-") self.probable_next_color_label.config(fg=self.text_fg) self._update_casa_analysis_display() # ----------------- REAL-TIME PROCESSING ----------------------------- def _toggle_real_time_mode(self): if self.real_time_var.get(): # Se está ativando o modo real-time if not self.real_time_thread or not self.real_time_thread.is_alive(): print("Iniciando cliente Real-Time API...") if "1752950579" in self.auth_token: print( "ALERTA: O token de autenticação pode estar expirado. A API Real-Time pode não funcionar." ) self.stop_event.clear() self.real_time_client = BlazeRealtimeClient( self.auth_token, self.real_time_data_queue, self.stop_event ) self.real_time_thread = threading.Thread( target=self.real_time_client.start, daemon=True ) self.real_time_thread.start() self.master.after(100, self._process_real_time_queue) else: print("Parando cliente Real-Time API...") self.stop_event.set() if self.real_time_thread and self.real_time_thread.is_alive(): self.real_time_thread.join(timeout=5) if self.real_time_thread.is_alive(): print("Realtime: Thread do cliente não terminou no tempo esperado.") self.real_time_client = None def _process_real_time_queue(self): if not self.real_time_var.get(): return try: while not self.real_time_data_queue.empty(): new_result = self.real_time_data_queue.get_nowait() if new_result["created_at"] > self.last_timestamp: self.last_timestamp = new_result["created_at"] color_from_api = new_result["color"] detail_from_api = new_result["detail"] print( f"Realtime: Novo resultado recebido - Cor: {color_from_api}, Detalhe: {detail_from_api}, Roll: {new_result.get('roll')}" ) self._add_to_sequence(color_from_api, detail_from_api) except queue.Empty: pass except Exception as e: print(f"Erro ao processar fila de tempo real: {e}") finally: if self.real_time_var.get(): self.master.after(200, self._process_real_time_queue) # Ponto de entrada principal da aplicação if __name__ == "__main__": root = tk.Tk() csv_file_path = "data.csv" app = SequenceCounterApp(root, csv_file_path) def on_closing(): print("Fechando aplicação...") if app.real_time_thread and app.real_time_thread.is_alive(): print("Parando thread de real-time...") app.stop_event.set() app.real_time_thread.join(timeout=5) if app.real_time_thread.is_alive(): print("Timeout ao esperar a thread de real-time terminar.") if ( app.real_time_client and app.real_time_client.loop and not app.real_time_client.loop.is_closed() ): print("Fechando loop asyncio do cliente WebSocket...") if app.real_time_client.sio and app.real_time_client.sio.connected: app.real_time_client.loop.run_until_complete( app.real_time_client.sio.disconnect() ) app.real_time_client.loop.call_soon_threadsafe( app.real_time_client.loop.stop ) root.destroy() print("Aplicação fechada.") root.protocol("WM_DELETE_WINDOW", on_closing) try: root.mainloop() except KeyboardInterrupt: print("\nInterrupção pelo teclado recebida. Fechando...") on_closing()