@/sequence_analyzer.py Quero que se a cor que em `...

Criado em: 27 de maio de 2025

Respondido usando O3 por Chat01

Pergunta

@/sequence_analyzer.py Quero que se a cor que em Provavel Próxima foi sugerida, bater com o resultado da api, esse novo resultado que for adicionado em Sequencia atual deverá ter a cor que já está atualmente, agora se não bater com o resultado da api, esse novo resultado que for adicionado em Sequencia atual deverá ter a mesma cor só que um pouco mais escuro

  • para isso considere a cor e não o m/M

(mande o código completo sem omitir linha no formato markdown)

python
# filepath: c:\\Users\\lucas\\Downloads\\-dou\\sequence_analyzer.py # O arquivo data.csv segue esse formato: # # ``` # color,total_red,total_white,total_black,total_red_bets_placed,total_white_bets_placed,total_black_bets_placed # Red 4,266.27,1399.03,305.06,128,796,146 # Black 14,350.55,1564.81,570.30,117,861,188 # Black 14,149.74,1366.68,736.29,83,766,156 # ... # ``` # # - O total_red, total_black e total_white é o valor $ total da respectiva rodada de cada cor. # - O total_red_bets_placed, total_black_bets_placed e o total_white_bets_placed é a quantidade total de players que apostaram na rodada de cada cor. # import tkinter as tk import csv # requests # Removido, pois get_current_result será substituído import threading import time from typing import Optional, Dict, List, Tuple, Any import socketio # Adicionado import asyncio # Adicionado import queue # Adicionado # Constantes para o cliente WebSocket (inspirado em blaze_bet_tracker.py) AUTH_TOKEN_FROM_TRACKER = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ.Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8" ROOM_TO_SUBSCRIBE = "double_room_1" BLAZE_API_URL = "wss://api-gaming.blaze.bet.br" SOCKETIO_PATH = "/replication/" # Classe para o cliente WebSocket class BlazeRealtimeClient: def __init__(self, token: str, data_queue: queue.Queue, stop_event: threading.Event): self.token = token self.data_queue = data_queue self.stop_event = stop_event self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.loop: Optional[asyncio.AbstractEventLoop] = None self.is_authenticated = False self.is_subscribed = False # Registrar handlers de eventos do socketio self.sio.on('connect', self.on_sio_connect) self.sio.on('disconnect', self.on_sio_disconnect) self.sio.on('data', self.on_sio_data) self.sio.on('connect_error', self.on_sio_connect_error) async def on_sio_connect(self): print("Realtime: WebSocket conectado. Autenticando...") self.is_authenticated = False self.is_subscribed = False auth_payload = {"token": self.token} try: await self.sio.emit("cmd", {"id": "authenticate", "payload": auth_payload}, callback=self._handle_auth_ack) except Exception as e: print(f"Realtime: Erro ao emitir autenticação: {e}") self.stop_event.set() # Sinaliza para parar se a emissão falhar async def _handle_auth_ack(self, ack_data: Any): if ack_data == "ok" or (isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok"): print("Realtime: Autenticado com sucesso. Inscrevendo-se na sala...") self.is_authenticated = True subscribe_payload = {"room": ROOM_TO_SUBSCRIBE} try: await self.sio.emit("cmd", {"id": "subscribe", "payload": subscribe_payload}, callback=self._handle_subscribe_ack) except Exception as e: print(f"Realtime: Erro ao emitir inscrição: {e}") self.stop_event.set() else: print(f"Realtime: Falha na autenticação WebSocket: {ack_data}. O token pode ser inválido ou expirado.") self.is_authenticated = False self.stop_event.set() async def _handle_subscribe_ack(self, ack_data: Any): if ack_data == "ok" or (isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok"): print(f"Realtime: Inscrito com sucesso na sala {ROOM_TO_SUBSCRIBE}.") self.is_subscribed = True else: print(f"Realtime: Falha ao inscrever-se na sala {ROOM_TO_SUBSCRIBE}: {ack_data}") self.is_subscribed = False self.stop_event.set() async def on_sio_disconnect(self): print("Realtime: WebSocket desconectado.") self.is_authenticated = False self.is_subscribed = False # A lógica de reconexão está no main_ws_loop async def on_sio_connect_error(self, data: Any): print(f"Realtime: Erro de conexão WebSocket: {data}") # Isso pode levar a uma desconexão, e o main_ws_loop tentará reconectar. async def on_sio_data(self, data_message: Dict): if not (isinstance(data_message, dict) and data_message.get("id") == "double.tick"): return payload = data_message.get("payload") if not (payload and isinstance(payload, dict)): return status = payload.get("status") created_at = payload.get("created_at") # Timestamp da rodada if status in ["finished", "complete"] and created_at: numeric_color_api = payload.get("color") # 0: white, 1: red, 2: black roll_api = payload.get("roll") # Usar os valores em EUR para determinar M/m, conforme blaze_bet_tracker.py total_red_eur = payload.get("total_red_eur_bet", 0.0) total_black_eur = payload.get("total_black_eur_bet", 0.0) game_color_str: Optional[str] = None if numeric_color_api == 0: game_color_str = "White" elif numeric_color_api == 1: game_color_str = "Red" elif numeric_color_api == 2: game_color_str = "Black" detail_str: Optional[str] = None if game_color_str == "Red": if total_red_eur > total_black_eur: detail_str = "M" elif total_red_eur < total_black_eur: detail_str = "m" elif game_color_str == "Black": if total_black_eur > total_red_eur: detail_str = "M" elif total_black_eur < total_red_eur: detail_str = "m" if game_color_str: # Certifica-se que a cor foi determinada result = { "color": game_color_str, "detail": detail_str, "roll": roll_api, "created_at": created_at, "status": status, } self.data_queue.put(result) async def main_ws_loop(self): while not self.stop_event.is_set(): if not self.sio.connected: if self.stop_event.is_set(): break # Verifica antes de tentar conectar print("Realtime: Tentando conectar WebSocket...") try: await self.sio.connect(BLAZE_API_URL, transports=['websocket'], socketio_path=SOCKETIO_PATH) # on_sio_connect cuidará da autenticação e inscrição após a conexão # Aguarda um pouco para dar tempo para conectar e autenticar/inscrever for _ in range(50): # Espera até 5s por autenticação/inscrição if self.is_authenticated and self.is_subscribed: break if self.stop_event.is_set(): break await asyncio.sleep(0.1) if not (self.is_authenticated and self.is_subscribed) and not self.stop_event.is_set(): print("Realtime: Timeout esperando por autenticação/inscrição. Desconectando para tentar novamente.") if self.sio.connected: await self.sio.disconnect() except socketio.exceptions.ConnectionError as e: print(f"Realtime: Falha na tentativa de conexão: {e}. Tentando novamente em 5s.") if self.stop_event.is_set(): break await asyncio.sleep(5) except Exception as e: print(f"Realtime: Erro inesperado na conexão: {e}. Tentando novamente em 5s.") if self.stop_event.is_set(): break await asyncio.sleep(5) continue # Volta ao início do loop para verificar stop_event e status da conexão if self.stop_event.is_set(): break await asyncio.sleep(0.1) # Mantém o loop ativo, permitindo a verificação do stop_event if self.sio.connected: print("Realtime: Desconectando WebSocket devido ao evento de parada.") await self.sio.disconnect() print("Realtime: Loop principal do cliente WebSocket encerrado.") def start(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self.main_ws_loop()) except Exception as e: print(f"Realtime: Erro ao iniciar cliente: {e}") finally: # Garante que o loop seja fechado corretamente if self.loop and self.sio.connected: # Tenta desconectar se ainda conectado self.loop.run_until_complete(self.sio.disconnect()) if self.loop and not self.loop.is_closed(): self.loop.close() print("Realtime: Loop de eventos do cliente WebSocket totalmente fechado.") # Remover a função get_current_result antiga # def get_current_result() -> Optional[Dict]: # ... (código antigo removido) ... class SequenceCounterApp: def __init__(self, master, csv_filepath): self.master = master master.title("Contador de Sequências de Cores") # Dark Theme Colors self.dark_bg = "#2E2E2E" self.text_fg = "#E0E0E0" self.text_bold_fg = "white" self.frame_bg = self.dark_bg self.button_bg_std = "#4A4A4A" self.button_fg_std = self.text_fg self.red_accent_fg = "#FF6B6B" self.black_accent_fg = "#C0C0C0" self.white_accent_fg = "#E0E0E0" # These colors are used directly in button creation, not as instance variables red_button_bg = "#B80000" red_button_fg = "white" black_button_bg = "#333333" black_button_fg = "white" white_button_bg = "#555555" white_button_fg = "white" master.configure(bg=self.dark_bg) self.csv_filepath = csv_filepath self.winning_colors_from_csv: List[Tuple[str, Optional[str]]] = [] # Stores (color, detail) self._load_winning_colors() # Load data at init if not self.winning_colors_from_csv: error_label = tk.Label(master, text=f"Erro ao carregar dados de {csv_filepath}\\nVerifique o arquivo ou o caminho.", fg=self.red_accent_fg, bg=self.dark_bg) error_label.pack() self.disable_buttons = True else: self.disable_buttons = False self.current_sequence: List[Tuple[str, Optional[str]]] = [] self.full_history_sequence: List[Tuple[str, Optional[str]]] = [] # Stores all added elements # Não há seleção de modo, o modo "Detalhado" é o padrão. # self.mode_var = tk.StringVar(value="detailed") # Removido # Elementos da UI self.info_label = tk.Label(master, text="Clique nos botões para formar uma sequência:", bg=self.dark_bg, fg=self.text_fg) self.info_label.pack(pady=5) current_sequence_info_frame = tk.Frame(master, bg=self.frame_bg) current_sequence_info_frame.pack(pady=2) self.sequence_display_label = tk.Label(current_sequence_info_frame, text="Sequência atual:", bg=self.frame_bg, fg=self.text_fg) self.sequence_display_label.pack(side=tk.LEFT, padx=(0, 5)) self.sequence_label_text = tk.StringVar() self.sequence_label_text.set("Nenhuma") self.sequence_label = tk.Label(current_sequence_info_frame, textvariable=self.sequence_label_text, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg) self.sequence_label.pack(side=tk.LEFT, padx=(0, 10)) self.probable_next_color_desc_label = tk.Label(current_sequence_info_frame, text="Provável Próxima:", bg=self.frame_bg, fg=self.text_fg) self.probable_next_color_desc_label.pack(side=tk.LEFT, padx=(10, 5)) self.probable_next_color_text_var = tk.StringVar() self.probable_next_color_text_var.set("-") self.probable_next_color_label = tk.Label(current_sequence_info_frame, textvariable=self.probable_next_color_text_var, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg) self.probable_next_color_label.pack(side=tk.LEFT) self.count_display_label = tk.Label(master, text="Ocorrências na base de dados (para Sequência Atual):", bg=self.dark_bg, fg=self.text_fg) self.count_display_label.pack() self.count_label_text = tk.StringVar() self.count_label_text.set("0") self.count_label = tk.Label(master, textvariable=self.count_label_text, font=("Arial", 12, "bold"), bg=self.dark_bg, fg=self.text_bold_fg) self.count_label.pack(pady=5) self.next_color_title_label = tk.Label(master, text="Ocorrências da Próxima Cor Após Sequência:", bg=self.dark_bg, fg=self.text_fg) self.next_color_title_label.pack(pady=(10,0)) # Frame for next color counts (dynamic content) self.next_color_display_frame = tk.Frame(master, bg=self.frame_bg) self.next_color_display_frame.pack(pady=5) self._setup_next_color_labels() # Initialize labels # Button Frames self.button_container_frame = tk.Frame(master, bg=self.frame_bg) self.button_container_frame.pack(pady=10) # Apenas um frame para botões detalhados self.detailed_button_frame = tk.Frame(self.button_container_frame, bg=self.frame_bg) self.detailed_button_frame.pack(pady=5) # Sempre visível # Detailed Buttons (agora são os únicos botões) self.red_M_button = tk.Button(self.detailed_button_frame, text="Red-M", command=lambda: self._add_to_sequence("Red", "M"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.red_M_button.pack(side=tk.LEFT, padx=3) self.red_m_button = tk.Button(self.detailed_button_frame, text="Red-m", command=lambda: self._add_to_sequence("Red", "m"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.red_m_button.pack(side=tk.LEFT, padx=3) self.black_M_button = tk.Button(self.detailed_button_frame, text="Black-M", command=lambda: self._add_to_sequence("Black", "M"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.black_M_button.pack(side=tk.LEFT, padx=3) self.black_m_button = tk.Button(self.detailed_button_frame, text="Black-m", command=lambda: self._add_to_sequence("Black", "m"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.black_m_button.pack(side=tk.LEFT, padx=3) # White Button (agora só existe uma versão) self.white_button_detailed = tk.Button(self.detailed_button_frame, text="White", command=lambda: self._add_to_sequence("White", None), bg=white_button_bg, fg=white_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0) self.white_button_detailed.pack(side=tk.LEFT, padx=3) self.reset_button = tk.Button(master, text="Resetar Sequência", command=self._reset_sequence, width=15, height=2, bg=self.button_bg_std, fg=self.button_fg_std, relief=tk.FLAT, borderwidth=0) self.reset_button.pack(pady=10) # Input para o limite máximo de sequências self.max_sequence_limit = tk.IntVar(value=4) # Default value self.max_sequence_limit_label = tk.Label(master, text="Limite Máximo de Sequências (1-10):", bg=self.dark_bg, fg=self.text_fg) self.max_sequence_limit_label.pack(pady=(10,0)) self.max_sequence_limit_spinbox = tk.Spinbox(master, from_=1, to=10, textvariable=self.max_sequence_limit, width=5, bg=self.button_bg_std, fg=self.button_fg_std, command=self._on_max_sequence_limit_change) # Update display on change self.max_sequence_limit_spinbox.pack(pady=5) # Labels for Casa Analysis self.casa_analysis_title_label = tk.Label(master, text="Análise \'Casas\' (Próximas Cores para Sub-Sequências Atuais):", font=("Arial", 10, "bold"), bg=self.dark_bg, fg=self.text_fg) self.casa_analysis_title_label.pack(pady=(10,0)) self.casa_labels_frames = [] # To store frames for each casa's counts self.casa_labels_vars = [] # The actual creation of casa labels will happen in _setup_casa_labels, # which is called by _on_mode_change. self._on_mode_change() # Set initial button visibility and disable state # Variáveis e configuração para modo de atualização em tempo real self.stop_event = threading.Event() self.last_timestamp = "1970-01-01T00:00:00.000Z" # Initialize to a very old date self.real_time_var = tk.BooleanVar() self.real_time_var.set(False) # Adicionar para o cliente WebSocket self.auth_token = AUTH_TOKEN_FROM_TRACKER # Usar o token definido globalmente self.real_time_data_queue = queue.Queue() self.real_time_client: Optional[BlazeRealtimeClient] = None self.real_time_thread: Optional[threading.Thread] = None real_time_cb = tk.Checkbutton(master, text="Real-Time API", variable=self.real_time_var, command=self._toggle_real_time_mode, bg=self.dark_bg, fg=self.text_fg, selectcolor=self.frame_bg) real_time_cb.pack(pady=5) # self._update_display() # Initial display update - Now handled by _on_mode_change -> _reset_sequence def _on_max_sequence_limit_change(self): """Called when the max sequence limit spinbox value changes.""" self._setup_casa_labels() # Rebuild casa labels with the new limit new_max_limit = self.max_sequence_limit.get() # Update current_sequence to be a slice of the full history, limited by the new max_limit self.current_sequence = self.full_history_sequence[-new_max_limit:] self._update_display() # Update the display to reflect new analysis def _format_sequence_element(self, color: str, detail: Optional[str]) -> str: if detail: return f"{color}-{detail}" return color def _on_mode_change(self): # Não há mais seleção de modo, então apenas atualiza estados e labels self._update_button_states() self._setup_next_color_labels() # Rebuild next color labels for the new mode self._setup_casa_labels() # Rebuild casa labels for the new mode self._reset_sequence() # Resets current sequence and updates display def _update_button_states(self): # Sempre opera no modo detalhado is_disabled = self.disable_buttons self.detailed_button_frame.pack(pady=5) # self.classic_button_frame.pack_forget() # classic_button_frame não existe mais self.red_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.red_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.white_button_detailed.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.reset_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) def _setup_next_color_labels(self): # Clear existing labels in the frame for widget in self.next_color_display_frame.winfo_children(): widget.destroy() # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg # Sempre configura para o modo detalhado self.next_red_M_label_text = tk.StringVar(value="Red-M: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_red_M_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_red_m_label_text = tk.StringVar(value="Red-m: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_red_m_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_black_M_label_text = tk.StringVar(value="Black-M: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_black_M_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_black_m_label_text = tk.StringVar(value="Black-m: 0") tk.Label(self.next_color_display_frame, textvariable=self.next_black_m_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) self.next_white_label_text = tk.StringVar(value="White: 0") # Re-use for White tk.Label(self.next_color_display_frame, textvariable=self.next_white_label_text, font=("Arial", 10), fg=white_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=5) def _setup_casa_labels(self): # Clear existing frames and vars for frame in self.casa_labels_frames: frame.destroy() self.casa_labels_frames = [] self.casa_labels_vars = [] # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg max_limit = self.max_sequence_limit.get() for n_casa in range(1, max_limit + 1): casa_outer_frame = tk.Frame(self.master, bg=frame_bg) casa_outer_frame.pack(pady=2, fill=tk.X) self.casa_labels_frames.append(casa_outer_frame) seq_var = tk.StringVar() seq_var.set(f"Casa {n_casa} (Seq: ...):") seq_label = tk.Label(casa_outer_frame, textvariable=seq_var, font=("Arial", 9), bg=frame_bg, fg=self.text_fg) seq_label.pack(side=tk.TOP, anchor="w", padx=10) counts_frame = tk.Frame(casa_outer_frame, bg=frame_bg) counts_frame.pack(side=tk.TOP, anchor="w", padx=10) red_M_var = tk.StringVar(value="R-M: -") red_m_var = tk.StringVar(value="R-m: -") black_M_var = tk.StringVar(value="B-M: -") black_m_var = tk.StringVar(value="B-m: -") white_var_detailed = tk.StringVar(value="White: -") tk.Label(counts_frame, textvariable=red_M_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=red_m_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=black_M_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=black_m_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) tk.Label(counts_frame, textvariable=white_var_detailed, font=("Arial", 9), fg=white_accent_fg, bg=frame_bg).pack(side=tk.LEFT, padx=3) self.casa_labels_vars.append({ "seq": seq_var, "detailed_red_M": red_M_var, "detailed_red_m": red_m_var, "detailed_black_M": black_M_var, "detailed_black_m": black_m_var, "shared_white": white_var_detailed }) def _load_winning_colors(self): self.winning_colors_from_csv = [] try: with open(self.csv_filepath, mode='r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) if not reader.fieldnames or 'color' not in reader.fieldnames or \ 'total_red' not in reader.fieldnames or 'total_black' not in reader.fieldnames: print(f"Erro: Cabeçalhos necessários não encontrados em {self.csv_filepath}") return for row in reader: color_part_full = row.get('color', '') if not color_part_full: continue color_part = color_part_full.split(' ')[0] detail = None if color_part in ["Red", "Black", "White"]: if color_part != "White": # Only Red and Black can have M/m try: total_red = float(row.get('total_red', '0')) total_black = float(row.get('total_black', '0')) if color_part == "Red": if total_red > total_black: detail = "M" elif total_red < total_black: detail = "m" elif color_part == "Black": if total_black > total_red: detail = "M" elif total_black < total_red: detail = "m" except ValueError: print(f"Aviso: Valor não numérico para total_red/total_black na linha: {row}") # detail remains None self.winning_colors_from_csv.append((color_part, detail)) except FileNotFoundError: print(f"Erro: Arquivo não encontrado em {self.csv_filepath}") except Exception as e: print(f"Erro ao ler o CSV: {e}") def _get_next_colors_for_specific_sequence(self, sequence_to_find: List[Tuple[str, Optional[str]]]) -> Dict[Tuple[str, Optional[str]], int]: # Returns counts for detailed (color, detail) tuples next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not sequence_to_find or not self.winning_colors_from_csv: return next_counts len_sub = len(sequence_to_find) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i:i+len_sub] == sequence_to_find: next_color_tuple = self.winning_colors_from_csv[i+len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_casa_analysis_display(self): # mode = self.mode_var.get() # mode_var removido max_limit = self.max_sequence_limit.get() for n_idx in range(max_limit): N = n_idx + 1 # Ensure we don't try to access an index that doesn't exist if n_idx >= len(self.casa_labels_vars): break # This should not happen if _setup_casa_labels is called correctly vars_for_casa = self.casa_labels_vars[n_idx] current_seq_len = len(self.current_sequence) seq_display_text = "N/A" # Default if disabled or no data if self.disable_buttons or not self.winning_colors_from_csv: pass # seq_display_text remains N/A elif not self.current_sequence: seq_display_text = "-" elif current_seq_len < N: seq_display_text = "Insuficiente" else: # Enough elements target_sequence_for_casa = self.current_sequence[:N] sequence_str_parts = [self._format_sequence_element(c, d) for c, d in target_sequence_for_casa] sequence_str = " -> ".join(sequence_str_parts) max_display_len = 25 if len(sequence_str) > max_display_len: sequence_str = sequence_str[:max_display_len-3] + "..." seq_display_text = sequence_str vars_for_casa["seq"].set(f"Casa {N} (Sub: {seq_display_text})") if self.disable_buttons or not self.winning_colors_from_csv or current_seq_len < N: # Set all counts to "-" vars_for_casa["detailed_red_M"].set("R-M: -") vars_for_casa["detailed_red_m"].set("R-m: -") vars_for_casa["detailed_black_M"].set("B-M: -") vars_for_casa["detailed_black_m"].set("B-m: -") vars_for_casa["shared_white"].set("White: -") continue # Calculate and display counts next_color_counts_detailed = self._get_next_colors_for_specific_sequence(self.current_sequence[:N]) # Sempre no modo detalhado vars_for_casa["detailed_red_M"].set(f"R-M: {next_color_counts_detailed.get(('Red', 'M'), 0)}") vars_for_casa["detailed_red_m"].set(f"R-m: {next_color_counts_detailed.get(('Red', 'm'), 0)}") vars_for_casa["detailed_black_M"].set(f"B-M: {next_color_counts_detailed.get(('Black', 'M'), 0)}") vars_for_casa["detailed_black_m"].set(f"B-m: {next_color_counts_detailed.get(('Black', 'm'), 0)}") vars_for_casa["shared_white"].set(f"White: {next_color_counts_detailed.get(('White', None), 0)}") def _determine_most_probable_next_color(self) -> Tuple[Optional[str], Optional[str]]: # Returns (color, detail) or (None,None) if not self.current_sequence or self.disable_buttons or not self.winning_colors_from_csv: return None, None detailed_next_counts = self._count_next_color_occurrences() # Returns { (c,d): count } if not detailed_next_counts: return None, None # Sempre no modo detalhado max_occurrence = max(detailed_next_counts.values()) if max_occurrence == 0: return None, None # Get all (color, detail) tuples that have the max_occurrence probable_detailed_colors = [cd_tuple for cd_tuple, count in detailed_next_counts.items() if count == max_occurrence] if probable_detailed_colors: # Sort for consistent tie-breaking, e.g., by color then detail return sorted(probable_detailed_colors, key=lambda x: (x[0], str(x[1])))[0] return None, None def _add_to_sequence(self, color: str, detail: Optional[str]): if self.disable_buttons: return entry = (color, detail) self.full_history_sequence.append(entry) # Always keep current_sequence as a slice of the full history, limited by max_sequence_limit self.current_sequence = self.full_history_sequence[-self.max_sequence_limit.get():] self._update_display() def _reset_sequence(self): # if self.disable_buttons: return # Allow reset even if buttons are disabled by data load failure self.current_sequence = [] self.full_history_sequence = [] self._update_display() def _count_sequence_occurrences(self) -> int: if not self.current_sequence or not self.winning_colors_from_csv: return 0 count = 0 len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main < len_sub: return 0 for i in range(len_main - len_sub + 1): # Direct comparison of list of tuples if self.winning_colors_from_csv[i:i+len_sub] == self.current_sequence: count += 1 return count def _count_next_color_occurrences(self) -> Dict[Tuple[str, Optional[str]], int]: # Always returns detailed counts: { (color, detail_or_None): count } next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not self.current_sequence or not self.winning_colors_from_csv: return next_counts len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: # Not enough data for a sequence + next color return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i:i+len_sub] == self.current_sequence: next_color_tuple = self.winning_colors_from_csv[i+len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_display(self): # Sequence Label if self.current_sequence: sequence_str_parts = [self._format_sequence_element(c, d) for c, d in self.current_sequence] self.sequence_label_text.set(" -> ".join(sequence_str_parts)) else: self.sequence_label_text.set("Nenhuma") # Occurrences Count occurrences = self._count_sequence_occurrences() self.count_label_text.set(f"{occurrences}") # Next Color Counts Display detailed_next_counts = self._count_next_color_occurrences() # mode = self.mode_var.get() # mode_var removido # Sempre no modo detalhado if hasattr(self, 'next_red_M_label_text'): self.next_red_M_label_text.set(f"Red-M: {detailed_next_counts.get(('Red', 'M'), 0)}") if hasattr(self, 'next_red_m_label_text'): self.next_red_m_label_text.set(f"Red-m: {detailed_next_counts.get(('Red', 'm'), 0)}") if hasattr(self, 'next_black_M_label_text'): self.next_black_M_label_text.set(f"Black-M: {detailed_next_counts.get(('Black', 'M'), 0)}") if hasattr(self, 'next_black_m_label_text'): self.next_black_m_label_text.set(f"Black-m: {detailed_next_counts.get(('Black', 'm'), 0)}") if hasattr(self, 'next_white_label_text'): self.next_white_label_text.set(f"White: {detailed_next_counts.get(('White', None), 0)}") # Probable Next Color prob_color, prob_detail = self._determine_most_probable_next_color() # Use instance variables for colors red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg text_fg = self.text_fg if prob_color: self.probable_next_color_text_var.set(self._format_sequence_element(prob_color, prob_detail)) if prob_color == "Red": self.probable_next_color_label.config(fg=red_accent_fg) elif prob_color == "Black": self.probable_next_color_label.config(fg=black_accent_fg) elif prob_color == "White": self.probable_next_color_label.config(fg=white_accent_fg) else: self.probable_next_color_label.config(fg=text_fg) # Should not happen if prob_color is set else: self.probable_next_color_text_var.set("-") self.probable_next_color_label.config(fg=text_fg) self._update_casa_analysis_display() def _toggle_real_time_mode(self): if self.real_time_var.get(): # Se está ativando o modo real-time if not self.real_time_thread or not self.real_time_thread.is_alive(): print("Iniciando cliente Real-Time API...") # Alerta sobre o token aqui, se necessário if "1752950579" in self.auth_token: # Checa se é o token com expiração conhecida # A data atual simulada é Maio 2025, token expira Março 2025 print("ALERTA: O token de autenticação pode estar expirado. A API Real-Time pode não funcionar.") self.stop_event.clear() self.real_time_client = BlazeRealtimeClient(self.auth_token, self.real_time_data_queue, self.stop_event) self.real_time_thread = threading.Thread(target=self.real_time_client.start, daemon=True) self.real_time_thread.start() self.master.after(100, self._process_real_time_queue) # Inicia a verificação da fila else: # Se está desativando o modo real-time print("Parando cliente Real-Time API...") self.stop_event.set() # Sinaliza para a thread do cliente parar if self.real_time_thread and self.real_time_thread.is_alive(): self.real_time_thread.join(timeout=5) # Espera a thread terminar if self.real_time_thread.is_alive(): print("Realtime: Thread do cliente não terminou no tempo esperado.") self.real_time_client = None # Limpa a referência ao cliente # A função _process_real_time_queue vai parar de se re-agendar por conta própria def _process_real_time_queue(self): if not self.real_time_var.get(): # Se o modo real-time foi desativado, para de processar return try: while not self.real_time_data_queue.empty(): # Processa todos os itens na fila new_result = self.real_time_data_queue.get_nowait() if new_result["created_at"] > self.last_timestamp: self.last_timestamp = new_result["created_at"] color_from_api = new_result["color"] # Ex: "Red", "Black", "White" detail_from_api = new_result["detail"] # Ex: "M", "m", None # roll_from_api = new_result["roll"] # Disponível se precisar mostrar o número print(f"Realtime: Novo resultado recebido - Cor: {color_from_api}, Detalhe: {detail_from_api}, Roll: {new_result.get('roll')}") # Adiciona automaticamente à sequência se o modo real-time estiver ativo self._add_to_sequence(color_from_api, detail_from_api) # Aqui você pode adicionar código para atualizar algum label na UI com o último resultado, se desejar # Ex: self.live_status_label.set(f"Último: {color_from_api} {detail_from_api or ''} ({roll_from_api})") else: # Resultado antigo ou duplicado, ignorar pass except queue.Empty: pass # Fila vazia, nada a fazer except Exception as e: print(f"Erro ao processar fila de tempo real: {e}") finally: if self.real_time_var.get(): # Reagenda a verificação se o modo real-time ainda estiver ativo self.master.after(200, self._process_real_time_queue) # Verifica a fila a cada 200ms # Ponto de entrada principal da aplicação if __name__ == "__main__": root = tk.Tk() # Presume-se que data.csv está no mesmo diretório ou fornecer o caminho correto csv_file_path = "data.csv" app = SequenceCounterApp(root, csv_file_path) # Adiciona um handler para fechar a aplicação de forma limpa def on_closing(): print("Fechando aplicação...") if app.real_time_thread and app.real_time_thread.is_alive(): print("Parando thread de real-time...") app.stop_event.set() app.real_time_thread.join(timeout=5) # Espera a thread do cliente WS if app.real_time_thread.is_alive(): print("Timeout ao esperar a thread de real-time terminar.") # Garante que o loop asyncio do cliente WS seja fechado se existir e estiver rodando if app.real_time_client and app.real_time_client.loop and not app.real_time_client.loop.is_closed(): print("Fechando loop asyncio do cliente WebSocket...") # Desconectar o cliente sio se estiver conectado if app.real_time_client.sio and app.real_time_client.sio.connected: app.real_time_client.loop.run_until_complete(app.real_time_client.sio.disconnect()) app.real_time_client.loop.call_soon_threadsafe(app.real_time_client.loop.stop) # Esperar um pouco para o loop parar # A thread do cliente já deve ter terminado, então o loop deve parar ou já parou. # Se a thread ainda estiver viva, o loop.close() pode dar problema. # A thread do cliente é daemon, então ela será terminada quando o programa principal sair, # mas é melhor tentar fechar o loop de forma limpa. # A lógica de fechamento do loop já está no método start() do BlazeRealtimeClient. root.destroy() print("Aplicação fechada.") root.protocol("WM_DELETE_WINDOW", on_closing) try: root.mainloop() except KeyboardInterrupt: print("\nInterrupção pelo teclado recebida. Fechando...") on_closing()

Pensamento

Adjusting UI colors

It seems like they’re asking about a UI change related to how new results are styled with specific colors. They want to set the color for new results when they get added, likely in _add_to_sequence. The result should have the same color if it matches the predicted color, but be a slightly darker version otherwise. I think this needs to apply to the color used for the result's display or the sequence label. Let me make sure I modify the right part of the code to handle this.

Clarifying color adjustment logic

It seems they’re asking to modify how the color is applied to new sequence entries. Currently, sequence labels are displayed with a static white color and no specific color styling per entry. They probably want to highlight new entries based on color prediction. If the predicted color matches the one from the API, it should stay the same; if not, make it darker. I'll need to check the button or suggestion box for color adjustments here based on the predicted values.

Clarifying color handling for sequence display

It looks like they want to update the color of each new entry based on whether the predicted color matches the API result. If colors match, the original color is retained; if not, a darker shade is used. So, the task seems to involve adjusting the sequence label color after each new result is added to the sequence, and modifying internal logic to account for color matching. It appears that changing details like "M/m" isn’t necessary, only the color needs to adjust—especially if mismatches occur.

Thought for 27 seconds

Resposta

python
# filepath: c:\\Users\\lucas\\Downloads\\-dou\\sequence_analyzer.py # O arquivo data.csv segue esse formato: # # ``` # color,total_red,total_white,total_black,total_red_bets_placed,total_white_bets_placed,total_black_bets_placed # Red 4,266.27,1399.03,305.06,128,796,146 # Black 14,350.55,1564.81,570.30,117,861,188 # Black 14,149.74,1366.68,736.29,83,766,156 # ... # ``` # # - O total_red, total_black e total_white é o valor $ total da respectiva rodada de cada cor. # - O total_red_bets_placed, total_black_bets_placed e o total_white_bets_placed é a quantidade total de players que apostaram na rodada de cada cor. # import tkinter as tk import csv # requests # Removido, pois get_current_result será substituído import threading import time from typing import Optional, Dict, List, Tuple, Any import socketio # Adicionado import asyncio # Adicionado import queue # Adicionado # Constantes para o cliente WebSocket (inspirado em blaze_bet_tracker.py) AUTH_TOKEN_FROM_TRACKER = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ.Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8" ) ROOM_TO_SUBSCRIBE = "double_room_1" BLAZE_API_URL = "wss://api-gaming.blaze.bet.br" SOCKETIO_PATH = "/replication/" # Classe para o cliente WebSocket class BlazeRealtimeClient: def __init__(self, token: str, data_queue: queue.Queue, stop_event: threading.Event): self.token = token self.data_queue = data_queue self.stop_event = stop_event self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.loop: Optional[asyncio.AbstractEventLoop] = None self.is_authenticated = False self.is_subscribed = False # Registrar handlers de eventos do socketio self.sio.on("connect", self.on_sio_connect) self.sio.on("disconnect", self.on_sio_disconnect) self.sio.on("data", self.on_sio_data) self.sio.on("connect_error", self.on_sio_connect_error) async def on_sio_connect(self): print("Realtime: WebSocket conectado. Autenticando...") self.is_authenticated = False self.is_subscribed = False auth_payload = {"token": self.token} try: await self.sio.emit( "cmd", {"id": "authenticate", "payload": auth_payload}, callback=self._handle_auth_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir autenticação: {e}") self.stop_event.set() # Sinaliza para parar se a emissão falhar async def _handle_auth_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print("Realtime: Autenticado com sucesso. Inscrevendo-se na sala...") self.is_authenticated = True subscribe_payload = {"room": ROOM_TO_SUBSCRIBE} try: await self.sio.emit( "cmd", {"id": "subscribe", "payload": subscribe_payload}, callback=self._handle_subscribe_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir inscrição: {e}") self.stop_event.set() else: print( f"Realtime: Falha na autenticação WebSocket: {ack_data}. O token pode ser inválido ou expirado." ) self.is_authenticated = False self.stop_event.set() async def _handle_subscribe_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print(f"Realtime: Inscrito com sucesso na sala {ROOM_TO_SUBSCRIBE}.") self.is_subscribed = True else: print( f"Realtime: Falha ao inscrever-se na sala {ROOM_TO_SUBSCRIBE}: {ack_data}" ) self.is_subscribed = False self.stop_event.set() async def on_sio_disconnect(self): print("Realtime: WebSocket desconectado.") self.is_authenticated = False self.is_subscribed = False # A lógica de reconexão está no main_ws_loop async def on_sio_connect_error(self, data: Any): print(f"Realtime: Erro de conexão WebSocket: {data}") # Isso pode levar a uma desconexão, e o main_ws_loop tentará reconectar. async def on_sio_data(self, data_message: Dict): if not (isinstance(data_message, dict) and data_message.get("id") == "double.tick"): return payload = data_message.get("payload") if not (payload and isinstance(payload, dict)): return status = payload.get("status") created_at = payload.get("created_at") # Timestamp da rodada if status in ["finished", "complete"] and created_at: numeric_color_api = payload.get("color") # 0: white, 1: red, 2: black roll_api = payload.get("roll") # Usar os valores em EUR para determinar M/m, conforme blaze_bet_tracker.py total_red_eur = payload.get("total_red_eur_bet", 0.0) total_black_eur = payload.get("total_black_eur_bet", 0.0) game_color_str: Optional[str] = None if numeric_color_api == 0: game_color_str = "White" elif numeric_color_api == 1: game_color_str = "Red" elif numeric_color_api == 2: game_color_str = "Black" detail_str: Optional[str] = None if game_color_str == "Red": if total_red_eur > total_black_eur: detail_str = "M" elif total_red_eur < total_black_eur: detail_str = "m" elif game_color_str == "Black": if total_black_eur > total_red_eur: detail_str = "M" elif total_black_eur < total_red_eur: detail_str = "m" if game_color_str: # Certifica-se que a cor foi determinada result = { "color": game_color_str, "detail": detail_str, "roll": roll_api, "created_at": created_at, "status": status, } self.data_queue.put(result) async def main_ws_loop(self): while not self.stop_event.is_set(): if not self.sio.connected: if self.stop_event.is_set(): break # Verifica antes de tentar conectar print("Realtime: Tentando conectar WebSocket...") try: await self.sio.connect( BLAZE_API_URL, transports=["websocket"], socketio_path=SOCKETIO_PATH, ) # on_sio_connect cuidará da autenticação e inscrição após a conexão # Aguarda um pouco para dar tempo para conectar e autenticar/inscrever for _ in range(50): # Espera até 5s por autenticação/inscrição if self.is_authenticated and self.is_subscribed: break if self.stop_event.is_set(): break await asyncio.sleep(0.1) if ( not (self.is_authenticated and self.is_subscribed) and not self.stop_event.is_set() ): print( "Realtime: Timeout esperando por autenticação/inscrição. Desconectando para tentar novamente." ) if self.sio.connected: await self.sio.disconnect() except socketio.exceptions.ConnectionError as e: print( f"Realtime: Falha na tentativa de conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) except Exception as e: print( f"Realtime: Erro inesperado na conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) continue # Volta ao início do loop para verificar stop_event e status da conexão if self.stop_event.is_set(): break await asyncio.sleep( 0.1 ) # Mantém o loop ativo, permitindo a verificação do stop_event if self.sio.connected: print("Realtime: Desconectando WebSocket devido ao evento de parada.") await self.sio.disconnect() print("Realtime: Loop principal do cliente WebSocket encerrado.") def start(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self.main_ws_loop()) except Exception as e: print(f"Realtime: Erro ao iniciar cliente: {e}") finally: # Garante que o loop seja fechado corretamente if self.loop and self.sio.connected: # Tenta desconectar se ainda conectado self.loop.run_until_complete(self.sio.disconnect()) if self.loop and not self.loop.is_closed(): self.loop.close() print("Realtime: Loop de eventos do cliente WebSocket totalmente fechado.") # Remover a função get_current_result antiga # def get_current_result() -> Optional[Dict]: # ... (código antigo removido) ... class SequenceCounterApp: def __init__(self, master, csv_filepath): self.master = master master.title("Contador de Sequências de Cores") # Dark Theme Colors self.dark_bg = "#2E2E2E" self.text_fg = "#E0E0E0" self.text_bold_fg = "white" self.frame_bg = self.dark_bg self.button_bg_std = "#4A4A4A" self.button_fg_std = self.text_fg self.red_accent_fg = "#FF6B6B" self.black_accent_fg = "#C0C0C0" self.white_accent_fg = "#E0E0E0" # Nova função para calcular tons mais escuros def _darken_color(hex_color: str, factor: float = 0.7) -> str: hex_color = hex_color.lstrip("#") r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) r = int(r * factor) g = int(g * factor) b = int(b * factor) return f"#{r:02X}{g:02X}{b:02X}" # Tornando acessível em toda a classe self._darken_color = _darken_color # These colors are used directly in button creation, not as instance variables red_button_bg = "#B80000" red_button_fg = "white" black_button_bg = "#333333" black_button_fg = "white" white_button_bg = "#555555" white_button_fg = "white" master.configure(bg=self.dark_bg) self.csv_filepath = csv_filepath self.winning_colors_from_csv: List[ Tuple[str, Optional[str]] ] = [] # Stores (color, detail) self._load_winning_colors() # Load data at init if not self.winning_colors_from_csv: error_label = tk.Label( master, text=f"Erro ao carregar dados de {csv_filepath}\nVerifique o arquivo ou o caminho.", fg=self.red_accent_fg, bg=self.dark_bg, ) error_label.pack() self.disable_buttons = True else: self.disable_buttons = False self.current_sequence: List[Tuple[str, Optional[str]]] = [] self.full_history_sequence: List[ Tuple[str, Optional[str]] ] = [] # Stores all added elements # Elementos da UI self.info_label = tk.Label( master, text="Clique nos botões para formar uma sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.info_label.pack(pady=5) current_sequence_info_frame = tk.Frame(master, bg=self.frame_bg) current_sequence_info_frame.pack(pady=2) self.sequence_display_label = tk.Label( current_sequence_info_frame, text="Sequência atual:", bg=self.frame_bg, fg=self.text_fg, ) self.sequence_display_label.pack(side=tk.LEFT, padx=(0, 5)) self.sequence_label_text = tk.StringVar() self.sequence_label_text.set("Nenhuma") self.sequence_label = tk.Label( current_sequence_info_frame, textvariable=self.sequence_label_text, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) self.sequence_label.pack(side=tk.LEFT, padx=(0, 10)) self.probable_next_color_desc_label = tk.Label( current_sequence_info_frame, text="Provável Próxima:", bg=self.frame_bg, fg=self.text_fg, ) self.probable_next_color_desc_label.pack(side=tk.LEFT, padx=(10, 5)) self.probable_next_color_text_var = tk.StringVar() self.probable_next_color_text_var.set("-") self.probable_next_color_label = tk.Label( current_sequence_info_frame, textvariable=self.probable_next_color_text_var, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) self.probable_next_color_label.pack(side=tk.LEFT) self.count_display_label = tk.Label( master, text="Ocorrências na base de dados (para Sequência Atual):", bg=self.dark_bg, fg=self.text_fg, ) self.count_display_label.pack() self.count_label_text = tk.StringVar() self.count_label_text.set("0") self.count_label = tk.Label( master, textvariable=self.count_label_text, font=("Arial", 12, "bold"), bg=self.dark_bg, fg=self.text_bold_fg, ) self.count_label.pack(pady=5) self.next_color_title_label = tk.Label( master, text="Ocorrências da Próxima Cor Após Sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.next_color_title_label.pack(pady=(10, 0)) # Frame for next color counts (dynamic content) self.next_color_display_frame = tk.Frame(master, bg=self.frame_bg) self.next_color_display_frame.pack(pady=5) self._setup_next_color_labels() # Initialize labels # Button Frames self.button_container_frame = tk.Frame(master, bg=self.frame_bg) self.button_container_frame.pack(pady=10) # Apenas um frame para botões detalhados self.detailed_button_frame = tk.Frame( self.button_container_frame, bg=self.frame_bg ) self.detailed_button_frame.pack(pady=5) # Sempre visível # Detailed Buttons (agora são os únicos botões) self.red_M_button = tk.Button( self.detailed_button_frame, text="Red-M", command=lambda: self._add_to_sequence("Red", "M"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_M_button.pack(side=tk.LEFT, padx=3) self.red_m_button = tk.Button( self.detailed_button_frame, text="Red-m", command=lambda: self._add_to_sequence("Red", "m"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_m_button.pack(side=tk.LEFT, padx=3) self.black_M_button = tk.Button( self.detailed_button_frame, text="Black-M", command=lambda: self._add_to_sequence("Black", "M"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_M_button.pack(side=tk.LEFT, padx=3) self.black_m_button = tk.Button( self.detailed_button_frame, text="Black-m", command=lambda: self._add_to_sequence("Black", "m"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_m_button.pack(side=tk.LEFT, padx=3) # White Button (agora só existe uma versão) self.white_button_detailed = tk.Button( self.detailed_button_frame, text="White", command=lambda: self._add_to_sequence("White", None), bg=white_button_bg, fg=white_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.white_button_detailed.pack(side=tk.LEFT, padx=3) self.reset_button = tk.Button( master, text="Resetar Sequência", command=self._reset_sequence, width=15, height=2, bg=self.button_bg_std, fg=self.button_fg_std, relief=tk.FLAT, borderwidth=0, ) self.reset_button.pack(pady=10) # Input para o limite máximo de sequências self.max_sequence_limit = tk.IntVar(value=4) # Default value self.max_sequence_limit_label = tk.Label( master, text="Limite Máximo de Sequências (1-10):", bg=self.dark_bg, fg=self.text_fg, ) self.max_sequence_limit_label.pack(pady=(10, 0)) self.max_sequence_limit_spinbox = tk.Spinbox( master, from_=1, to=10, textvariable=self.max_sequence_limit, width=5, bg=self.button_bg_std, fg=self.button_fg_std, command=self._on_max_sequence_limit_change, ) # Update display on change self.max_sequence_limit_spinbox.pack(pady=5) # Labels for Casa Analysis self.casa_analysis_title_label = tk.Label( master, text="Análise 'Casas' (Próximas Cores para Sub-Sequências Atuais):", font=("Arial", 10, "bold"), bg=self.dark_bg, fg=self.text_fg, ) self.casa_analysis_title_label.pack(pady=(10, 0)) self.casa_labels_frames = [] # To store frames for each casa's counts self.casa_labels_vars = [] self._on_mode_change() # Inicializações dependentes de modo # Variáveis e configuração para modo de atualização em tempo real self.stop_event = threading.Event() self.last_timestamp = "1970-01-01T00:00:00.000Z" # Initialize to a very old date self.real_time_var = tk.BooleanVar() self.real_time_var.set(False) # Adicionar para o cliente WebSocket self.auth_token = AUTH_TOKEN_FROM_TRACKER # Usar o token definido globalmente self.real_time_data_queue = queue.Queue() self.real_time_client: Optional[BlazeRealtimeClient] = None self.real_time_thread: Optional[threading.Thread] = None real_time_cb = tk.Checkbutton( master, text="Real-Time API", variable=self.real_time_var, command=self._toggle_real_time_mode, bg=self.dark_bg, fg=self.text_fg, selectcolor=self.frame_bg, ) real_time_cb.pack(pady=5) # ---------- NOVOS MÉTODOS / ALTERAÇÕES PARA LÓGICA DE COR ----------- def _get_accent_for_color(self, color: str) -> str: if color == "Red": return self.red_accent_fg elif color == "Black": return self.black_accent_fg elif color == "White": return self.white_accent_fg return self.text_bold_fg # Fallback # -------------------------------------------------------------------- def _on_max_sequence_limit_change(self): """Called when the max sequence limit spinbox value changes.""" self._setup_casa_labels() # Rebuild casa labels with the new limit new_max_limit = self.max_sequence_limit.get() # Update current_sequence to be a slice of the full history, limited by the new max_limit self.current_sequence = self.full_history_sequence[-new_max_limit:] self._update_display() # Update the display to reflect new analysis def _format_sequence_element(self, color: str, detail: Optional[str]) -> str: if detail: return f"{color}-{detail}" return color def _on_mode_change(self): # Não há mais seleção de modo, então apenas atualiza estados e labels self._update_button_states() self._setup_next_color_labels() # Rebuild next color labels for the new mode self._setup_casa_labels() # Rebuild casa labels for the new mode self._reset_sequence() # Resets current sequence and updates display def _update_button_states(self): # Sempre opera no modo detalhado is_disabled = self.disable_buttons self.detailed_button_frame.pack(pady=5) self.red_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.red_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.white_button_detailed.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.reset_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) def _setup_next_color_labels(self): # Clear existing labels in the frame for widget in self.next_color_display_frame.winfo_children(): widget.destroy() # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg # Sempre configura para o modo detalhado self.next_red_M_label_text = tk.StringVar(value="Red-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_M_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_red_m_label_text = tk.StringVar(value="Red-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_m_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_M_label_text = tk.StringVar(value="Black-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_M_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_m_label_text = tk.StringVar(value="Black-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_m_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_white_label_text = tk.StringVar(value="White: 0") # Re-use for White tk.Label( self.next_color_display_frame, textvariable=self.next_white_label_text, font=("Arial", 10), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) def _setup_casa_labels(self): # Clear existing frames and vars for frame in self.casa_labels_frames: frame.destroy() self.casa_labels_frames = [] self.casa_labels_vars = [] # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg max_limit = self.max_sequence_limit.get() for n_casa in range(1, max_limit + 1): casa_outer_frame = tk.Frame(self.master, bg=frame_bg) casa_outer_frame.pack(pady=2, fill=tk.X) self.casa_labels_frames.append(casa_outer_frame) seq_var = tk.StringVar() seq_var.set(f"Casa {n_casa} (Seq: ...):") seq_label = tk.Label( casa_outer_frame, textvariable=seq_var, font=("Arial", 9), bg=frame_bg, fg=self.text_fg, ) seq_label.pack(side=tk.TOP, anchor="w", padx=10) counts_frame = tk.Frame(casa_outer_frame, bg=frame_bg) counts_frame.pack(side=tk.TOP, anchor="w", padx=10) red_M_var = tk.StringVar(value="R-M: -") red_m_var = tk.StringVar(value="R-m: -") black_M_var = tk.StringVar(value="B-M: -") black_m_var = tk.StringVar(value="B-m: -") white_var_detailed = tk.StringVar(value="White: -") tk.Label( counts_frame, textvariable=red_M_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=red_m_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_M_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_m_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=white_var_detailed, font=("Arial", 9), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) self.casa_labels_vars.append( { "seq": seq_var, "detailed_red_M": red_M_var, "detailed_red_m": red_m_var, "detailed_black_M": black_M_var, "detailed_black_m": black_m_var, "shared_white": white_var_detailed, } ) def _load_winning_colors(self): self.winning_colors_from_csv = [] try: with open( self.csv_filepath, mode="r", newline="", encoding="utf-8" ) as file: reader = csv.DictReader(file) if ( not reader.fieldnames or "color" not in reader.fieldnames or "total_red" not in reader.fieldnames or "total_black" not in reader.fieldnames ): print(f"Erro: Cabeçalhos necessários não encontrados em {self.csv_filepath}") return for row in reader: color_part_full = row.get("color", "") if not color_part_full: continue color_part = color_part_full.split(" ")[0] detail = None if color_part in ["Red", "Black", "White"]: if color_part != "White": # Only Red and Black can have M/m try: total_red = float(row.get("total_red", "0")) total_black = float(row.get("total_black", "0")) if color_part == "Red": if total_red > total_black: detail = "M" elif total_red < total_black: detail = "m" elif color_part == "Black": if total_black > total_red: detail = "M" elif total_black < total_red: detail = "m" except ValueError: print( f"Aviso: Valor não numérico para total_red/total_black na linha: {row}" ) self.winning_colors_from_csv.append((color_part, detail)) except FileNotFoundError: print(f"Erro: Arquivo não encontrado em {self.csv_filepath}") except Exception as e: print(f"Erro ao ler o CSV: {e}") # ------------------------- CASA ANALYSIS ---------------------------- def _get_next_colors_for_specific_sequence( self, sequence_to_find: List[Tuple[str, Optional[str]]] ) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not sequence_to_find or not self.winning_colors_from_csv: return next_counts len_sub = len(sequence_to_find) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == sequence_to_find: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_casa_analysis_display(self): max_limit = self.max_sequence_limit.get() for n_idx in range(max_limit): N = n_idx + 1 if n_idx >= len(self.casa_labels_vars): break vars_for_casa = self.casa_labels_vars[n_idx] current_seq_len = len(self.current_sequence) seq_display_text = "N/A" if self.disable_buttons or not self.winning_colors_from_csv: pass elif not self.current_sequence: seq_display_text = "-" elif current_seq_len < N: seq_display_text = "Insuficiente" else: target_sequence_for_casa = self.current_sequence[:N] sequence_str_parts = [ self._format_sequence_element(c, d) for c, d in target_sequence_for_casa ] sequence_str = " -> ".join(sequence_str_parts) max_display_len = 25 if len(sequence_str) > max_display_len: sequence_str = sequence_str[: max_display_len - 3] + "..." seq_display_text = sequence_str vars_for_casa["seq"].set(f"Casa {N} (Sub: {seq_display_text})") if ( self.disable_buttons or not self.winning_colors_from_csv or current_seq_len < N ): vars_for_casa["detailed_red_M"].set("R-M: -") vars_for_casa["detailed_red_m"].set("R-m: -") vars_for_casa["detailed_black_M"].set("B-M: -") vars_for_casa["detailed_black_m"].set("B-m: -") vars_for_casa["shared_white"].set("White: -") continue next_color_counts_detailed = self._get_next_colors_for_specific_sequence( self.current_sequence[:N] ) vars_for_casa["detailed_red_M"].set( f"R-M: {next_color_counts_detailed.get(('Red', 'M'), 0)}" ) vars_for_casa["detailed_red_m"].set( f"R-m: {next_color_counts_detailed.get(('Red', 'm'), 0)}" ) vars_for_casa["detailed_black_M"].set( f"B-M: {next_color_counts_detailed.get(('Black', 'M'), 0)}" ) vars_for_casa["detailed_black_m"].set( f"B-m: {next_color_counts_detailed.get(('Black', 'm'), 0)}" ) vars_for_casa["shared_white"].set( f"White: {next_color_counts_detailed.get(('White', None), 0)}" ) # ------------------- PROVÁVEL PRÓXIMA COR --------------------------- def _determine_most_probable_next_color( self, ) -> Tuple[Optional[str], Optional[str]]: if not self.current_sequence or self.disable_buttons or not self.winning_colors_from_csv: return None, None detailed_next_counts = self._count_next_color_occurrences() if not detailed_next_counts: return None, None max_occurrence = max(detailed_next_counts.values()) if max_occurrence == 0: return None, None probable_detailed_colors = [ cd_tuple for cd_tuple, count in detailed_next_counts.items() if count == max_occurrence ] if probable_detailed_colors: return sorted(probable_detailed_colors, key=lambda x: (x[0], str(x[1])))[0] return None, None # ------------------- SEQUÊNCIA E ADIÇÕES ---------------------------- def _add_to_sequence(self, color: str, detail: Optional[str]): if self.disable_buttons: return # Captura a cor prevista ANTES de adicionar (desconsidera M/m) pred_color, _ = self._determine_most_probable_next_color() # Adiciona o novo resultado entry = (color, detail) self.full_history_sequence.append(entry) self.current_sequence = self.full_history_sequence[-self.max_sequence_limit.get() :] # Decide a cor de exibição para a sequência if pred_color and pred_color == color: seq_fg = self._get_accent_for_color(color) else: seq_fg = self._darken_color(self._get_accent_for_color(color)) self.sequence_label.config(fg=seq_fg) self._update_display() def _reset_sequence(self): self.current_sequence = [] self.full_history_sequence = [] self.sequence_label.config(fg=self.text_bold_fg) # Reseta cor self._update_display() def _count_sequence_occurrences(self) -> int: if not self.current_sequence or not self.winning_colors_from_csv: return 0 count = 0 len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main < len_sub: return 0 for i in range(len_main - len_sub + 1): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: count += 1 return count def _count_next_color_occurrences(self) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not self.current_sequence or not self.winning_colors_from_csv: return next_counts len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_display(self): if self.current_sequence: sequence_str_parts = [ self._format_sequence_element(c, d) for c, d in self.current_sequence ] self.sequence_label_text.set(" -> ".join(sequence_str_parts)) else: self.sequence_label_text.set("Nenhuma") self.sequence_label.config(fg=self.text_bold_fg) # Garante reset de cor occurrences = self._count_sequence_occurrences() self.count_label_text.set(f"{occurrences}") detailed_next_counts = self._count_next_color_occurrences() if hasattr(self, "next_red_M_label_text"): self.next_red_M_label_text.set( f"Red-M: {detailed_next_counts.get(('Red', 'M'), 0)}" ) if hasattr(self, "next_red_m_label_text"): self.next_red_m_label_text.set( f"Red-m: {detailed_next_counts.get(('Red', 'm'), 0)}" ) if hasattr(self, "next_black_M_label_text"): self.next_black_M_label_text.set( f"Black-M: {detailed_next_counts.get(('Black', 'M'), 0)}" ) if hasattr(self, "next_black_m_label_text"): self.next_black_m_label_text.set( f"Black-m: {detailed_next_counts.get(('Black', 'm'), 0)}" ) if hasattr(self, "next_white_label_text"): self.next_white_label_text.set( f"White: {detailed_next_counts.get(('White', None), 0)}" ) prob_color, prob_detail = self._determine_most_probable_next_color() if prob_color: self.probable_next_color_text_var.set( self._format_sequence_element(prob_color, prob_detail) ) self.probable_next_color_label.config( fg=self._get_accent_for_color(prob_color) ) else: self.probable_next_color_text_var.set("-") self.probable_next_color_label.config(fg=self.text_fg) self._update_casa_analysis_display() # ----------------- REAL-TIME PROCESSING ----------------------------- def _toggle_real_time_mode(self): if self.real_time_var.get(): # Se está ativando o modo real-time if not self.real_time_thread or not self.real_time_thread.is_alive(): print("Iniciando cliente Real-Time API...") if "1752950579" in self.auth_token: print( "ALERTA: O token de autenticação pode estar expirado. A API Real-Time pode não funcionar." ) self.stop_event.clear() self.real_time_client = BlazeRealtimeClient( self.auth_token, self.real_time_data_queue, self.stop_event ) self.real_time_thread = threading.Thread( target=self.real_time_client.start, daemon=True ) self.real_time_thread.start() self.master.after(100, self._process_real_time_queue) else: print("Parando cliente Real-Time API...") self.stop_event.set() if self.real_time_thread and self.real_time_thread.is_alive(): self.real_time_thread.join(timeout=5) if self.real_time_thread.is_alive(): print("Realtime: Thread do cliente não terminou no tempo esperado.") self.real_time_client = None def _process_real_time_queue(self): if not self.real_time_var.get(): return try: while not self.real_time_data_queue.empty(): new_result = self.real_time_data_queue.get_nowait() if new_result["created_at"] > self.last_timestamp: self.last_timestamp = new_result["created_at"] color_from_api = new_result["color"] detail_from_api = new_result["detail"] print( f"Realtime: Novo resultado recebido - Cor: {color_from_api}, Detalhe: {detail_from_api}, Roll: {new_result.get('roll')}" ) self._add_to_sequence(color_from_api, detail_from_api) except queue.Empty: pass except Exception as e: print(f"Erro ao processar fila de tempo real: {e}") finally: if self.real_time_var.get(): self.master.after(200, self._process_real_time_queue) # Ponto de entrada principal da aplicação if __name__ == "__main__": root = tk.Tk() csv_file_path = "data.csv" app = SequenceCounterApp(root, csv_file_path) def on_closing(): print("Fechando aplicação...") if app.real_time_thread and app.real_time_thread.is_alive(): print("Parando thread de real-time...") app.stop_event.set() app.real_time_thread.join(timeout=5) if app.real_time_thread.is_alive(): print("Timeout ao esperar a thread de real-time terminar.") if ( app.real_time_client and app.real_time_client.loop and not app.real_time_client.loop.is_closed() ): print("Fechando loop asyncio do cliente WebSocket...") if app.real_time_client.sio and app.real_time_client.sio.connected: app.real_time_client.loop.run_until_complete( app.real_time_client.sio.disconnect() ) app.real_time_client.loop.call_soon_threadsafe( app.real_time_client.loop.stop ) root.destroy() print("Aplicação fechada.") root.protocol("WM_DELETE_WINDOW", on_closing) try: root.mainloop() except KeyboardInterrupt: print("\nInterrupção pelo teclado recebida. Fechando...") on_closing()

Compartilhar esta Q&A