Voce conseguiria criar um script lite a partir do ...
Criado em: 31 de maio de 2025
Criado em: 31 de maio de 2025
Voce conseguiria criar um script lite a partir do código abaixo? mas o script lite terá que funcionar exatamente como ele? ou seja, um único arquivo mais enxuto e pequeno, mantendo GUI, WebSocket e áudio e tudo mais (obs: mas não é porque é pequeno que vai esculhambar com as coisas, tem que ser 100% fidedigno com o código original).
text# filepath: c:\\Users\\lucas\\Downloads\\-dou\\sequence_analyzer.py # O arquivo data.csv segue esse formato: # # ``` # color,total_red,total_white,total_black,total_red_bets_placed,total_white_bets_placed,total_black_bets_placed # Red 4,266.27,1399.03,305.06,128,796,146 # Black 14,350.55,1564.81,570.30,117,861,188 # Black 14,149.74,1366.68,736.29,83,766,156 # ... # ``` # # - O total_red, total_black e total_white é o valor $ total da respectiva rodada de cada cor. # - O total_red_bets_placed, total_black_bets_placed e o total_white_bets_placed é a quantidade total de players que apostaram na rodada de cada cor. # import tkinter as tk import csv # requests # Removido, pois get_current_result será substituído import threading import time from typing import Optional, Dict, List, Tuple, Any import socketio # Adicionado import asyncio # Adicionado import queue # Adicionado import winsound # Para tocar o arquivo .wav no Windows # Constantes para o cliente WebSocket (inspirado em blaze_bet_tracker.py) AUTH_TOKEN_FROM_TRACKER = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ.Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8" ) ROOM_TO_SUBSCRIBE = "double_room_1" BLAZE_API_URL = "wss://api-gaming.blaze.bet.br" SOCKETIO_PATH = "/replication/" # Classe para o cliente WebSocket class BlazeRealtimeClient: def __init__(self, token: str, data_queue: queue.Queue, stop_event: threading.Event): self.token = token self.data_queue = data_queue self.stop_event = stop_event self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.loop: Optional[asyncio.AbstractEventLoop] = None self.is_authenticated = False self.is_subscribed = False # Registrar handlers de eventos do socketio self.sio.on("connect", self.on_sio_connect) self.sio.on("disconnect", self.on_sio_disconnect) self.sio.on("data", self.on_sio_data) self.sio.on("connect_error", self.on_sio_connect_error) async def on_sio_connect(self): print("Realtime: WebSocket conectado. Autenticando...") self.is_authenticated = False self.is_subscribed = False auth_payload = {"token": self.token} try: await self.sio.emit( "cmd", {"id": "authenticate", "payload": auth_payload}, callback=self._handle_auth_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir autenticação: {e}") self.stop_event.set() # Sinaliza para parar se a emissão falhar async def _handle_auth_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print("Realtime: Autenticado com sucesso. Inscrevendo-se na sala...") self.is_authenticated = True subscribe_payload = {"room": ROOM_TO_SUBSCRIBE} try: await self.sio.emit( "cmd", {"id": "subscribe", "payload": subscribe_payload}, callback=self._handle_subscribe_ack, ) except Exception as e: print(f"Realtime: Erro ao emitir inscrição: {e}") self.stop_event.set() else: print( f"Realtime: Falha na autenticação WebSocket: {ack_data}. O token pode ser inválido ou expirado." ) self.is_authenticated = False self.stop_event.set() async def _handle_subscribe_ack(self, ack_data: Any): if ack_data == "ok" or ( isinstance(ack_data, list) and len(ack_data) > 0 and ack_data[0] == "ok" ): print(f"Realtime: Inscrito com sucesso na sala {ROOM_TO_SUBSCRIBE}.") self.is_subscribed = True else: print( f"Realtime: Falha ao inscrever-se na sala {ROOM_TO_SUBSCRIBE}: {ack_data}" ) self.is_subscribed = False self.stop_event.set() async def on_sio_disconnect(self): print("Realtime: WebSocket desconectado.") self.is_authenticated = False self.is_subscribed = False # A lógica de reconexão está no main_ws_loop async def on_sio_connect_error(self, data: Any): print(f"Realtime: Erro de conexão WebSocket: {data}") # Isso pode levar a uma desconexão, e o main_ws_loop tentará reconectar. async def on_sio_data(self, data_message: Dict): if not (isinstance(data_message, dict) and data_message.get("id") == "double.tick"): return payload = data_message.get("payload") if not (payload and isinstance(payload, dict)): return status = payload.get("status") created_at = payload.get("created_at") # Timestamp da rodada if status in ["finished", "complete"] and created_at: numeric_color_api = payload.get("color") # 0: white, 1: red, 2: black roll_api = payload.get("roll") # Usar os valores em EUR para determinar M/m, conforme blaze_bet_tracker.py total_red_eur = payload.get("total_red_eur_bet", 0.0) total_black_eur = payload.get("total_black_eur_bet", 0.0) game_color_str: Optional[str] = None if numeric_color_api == 0: game_color_str = "White" elif numeric_color_api == 1: game_color_str = "Red" elif numeric_color_api == 2: game_color_str = "Black" detail_str: Optional[str] = None if game_color_str == "Red": if total_red_eur > total_black_eur: detail_str = "M" elif total_red_eur < total_black_eur: detail_str = "m" elif game_color_str == "Black": if total_black_eur > total_red_eur: detail_str = "M" elif total_black_eur < total_red_eur: detail_str = "m" if game_color_str: # Certifica-se que a cor foi determinada result = { "color": game_color_str, "detail": detail_str, "roll": roll_api, "created_at": created_at, "status": status, } self.data_queue.put(result) async def main_ws_loop(self): while not self.stop_event.is_set(): if not self.sio.connected: if self.stop_event.is_set(): break # Verifica antes de tentar conectar print("Realtime: Tentando conectar WebSocket...") try: await self.sio.connect( BLAZE_API_URL, transports=["websocket"], socketio_path=SOCKETIO_PATH, ) # on_sio_connect cuidará da autenticação e inscrição após a conexão # Aguarda um pouco para dar tempo para conectar e autenticar/inscrever for _ in range(50): # Espera até 5s por autenticação/inscrição if self.is_authenticated and self.is_subscribed: break if self.stop_event.is_set(): break await asyncio.sleep(0.1) if ( not (self.is_authenticated and self.is_subscribed) and not self.stop_event.is_set() ): print( "Realtime: Timeout esperando por autenticação/inscrição. Desconectando para tentar novamente." ) if self.sio.connected: await self.sio.disconnect() except socketio.exceptions.ConnectionError as e: print( f"Realtime: Falha na tentativa de conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) except Exception as e: print( f"Realtime: Erro inesperado na conexão: {e}. Tentando novamente em 5s." ) if self.stop_event.is_set(): break await asyncio.sleep(5) continue # Volta ao início do loop para verificar stop_event e status da conexão if self.stop_event.is_set(): break await asyncio.sleep( 0.1 ) # Mantém o loop ativo, permitindo a verificação do stop_event if self.sio.connected: print("Realtime: Desconectando WebSocket devido ao evento de parada.") await self.sio.disconnect() print("Realtime: Loop principal do cliente WebSocket encerrado.") def start(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self.main_ws_loop()) except Exception as e: print(f"Realtime: Erro ao iniciar cliente: {e}") finally: # Garante que o loop seja fechado corretamente if self.loop and self.sio.connected: # Tenta desconectar se ainda conectado self.loop.run_until_complete(self.sio.disconnect()) if self.loop and not self.loop.is_closed(): self.loop.close() print("Realtime: Loop de eventos do cliente WebSocket totalmente fechado.") # Remover a função get_current_result antiga # def get_current_result() -> Optional[Dict]: # ... (código antigo removido) ... class SequenceCounterApp: def __init__(self, master, csv_filepath): self.master = master master.title("Contador de Sequências de Cores") # Dark Theme Colors self.dark_bg = "#2E2E2E" self.text_fg = "#E0E0E0" self.text_bold_fg = "white" self.frame_bg = self.dark_bg self.button_bg_std = "#4A4A4A" self.button_fg_std = self.text_fg self.red_accent_fg = "#FF6B6B" self.black_accent_fg = "#C0C0C0" self.white_accent_fg = "#E0E0E0" # Nova função para calcular tons mais escuros def _darken_color(hex_color: str, factor: float = 0.7) -> str: hex_color = hex_color.lstrip("#") r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) r = int(r * factor) g = int(g * factor) b = int(b * factor) return f"#{r:02X}{g:02X}{b:02X}" # Tornando acessível em toda a classe self._darken_color = _darken_color # These colors are used directly in button creation, not as instance variables red_button_bg = "#B80000" red_button_fg = "white" black_button_bg = "#333333" black_button_fg = "white" white_button_bg = "#555555" white_button_fg = "white" master.configure(bg=self.dark_bg) self.csv_filepath = csv_filepath self.winning_colors_from_csv: List[ Tuple[str, Optional[str]] ] = [] # Stores (color, detail) self._load_winning_colors() # Load data at init if not self.winning_colors_from_csv: error_label = tk.Label( master, text=f"Erro ao carregar dados de {csv_filepath}\nVerifique o arquivo ou o caminho.", fg=self.red_accent_fg, bg=self.dark_bg, ) error_label.pack() self.disable_buttons = True else: self.disable_buttons = False self.current_sequence: List[Tuple[str, Optional[str]]] = [] self.full_history_sequence: List[ Tuple[str, Optional[str]] ] = [] # Stores all added elements # Elementos da UI self.info_label = tk.Label( master, text="Clique nos botões para formar uma sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.info_label.pack(pady=5) current_sequence_info_frame = tk.Frame(master, bg=self.frame_bg) current_sequence_info_frame.pack(pady=2) self.sequence_display_label = tk.Label( current_sequence_info_frame, text="Sequência atual:", bg=self.frame_bg, fg=self.text_fg, ) self.sequence_display_label.pack(side=tk.LEFT, padx=(0, 5)) # Replace single label with frame for individual elements self.sequence_elements_frame = tk.Frame(current_sequence_info_frame, bg=self.frame_bg) self.sequence_elements_frame.pack(side=tk.LEFT, padx=(0, 10)) self.sequence_element_labels = [] # Store individual labels self.probable_next_color_desc_label = tk.Label( current_sequence_info_frame, text="Provável Próxima:", bg=self.frame_bg, fg=self.text_fg, ) self.probable_next_color_desc_label.pack(side=tk.LEFT, padx=(10, 5)) self.probable_next_color_text_var = tk.StringVar() self.probable_next_color_text_var.set("-") self.probable_next_color_label = tk.Label( current_sequence_info_frame, textvariable=self.probable_next_color_text_var, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) self.probable_next_color_label.pack(side=tk.LEFT) self.count_display_label = tk.Label( master, text="Ocorrências na base de dados (para Sequência Atual):", bg=self.dark_bg, fg=self.text_fg, ) self.count_display_label.pack() self.count_label_text = tk.StringVar() self.count_label_text.set("0") self.count_label = tk.Label( master, textvariable=self.count_label_text, font=("Arial", 12, "bold"), bg=self.dark_bg, fg=self.text_bold_fg, ) self.count_label.pack(pady=5) self.next_color_title_label = tk.Label( master, text="Ocorrências da Próxima Cor Após Sequência:", bg=self.dark_bg, fg=self.text_fg, ) self.next_color_title_label.pack(pady=(10, 0)) # Frame for next color counts (dynamic content) self.next_color_display_frame = tk.Frame(master, bg=self.frame_bg) self.next_color_display_frame.pack(pady=5) self._setup_next_color_labels() # Initialize labels # Button Frames self.button_container_frame = tk.Frame(master, bg=self.frame_bg) self.button_container_frame.pack(pady=10) # Apenas um frame para botões detalhados self.detailed_button_frame = tk.Frame( self.button_container_frame, bg=self.frame_bg ) self.detailed_button_frame.pack(pady=5) # Sempre visível # Detailed Buttons (agora são os únicos botões) self.red_M_button = tk.Button( self.detailed_button_frame, text="Red-M", command=lambda: self._add_to_sequence("Red", "M"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_M_button.pack(side=tk.LEFT, padx=3) self.red_m_button = tk.Button( self.detailed_button_frame, text="Red-m", command=lambda: self._add_to_sequence("Red", "m"), bg=red_button_bg, fg=red_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.red_m_button.pack(side=tk.LEFT, padx=3) self.black_M_button = tk.Button( self.detailed_button_frame, text="Black-M", command=lambda: self._add_to_sequence("Black", "M"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_M_button.pack(side=tk.LEFT, padx=3) self.black_m_button = tk.Button( self.detailed_button_frame, text="Black-m", command=lambda: self._add_to_sequence("Black", "m"), bg=black_button_bg, fg=black_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.black_m_button.pack(side=tk.LEFT, padx=3) # White Button (agora só existe uma versão) self.white_button_detailed = tk.Button( self.detailed_button_frame, text="White", command=lambda: self._add_to_sequence("White", None), bg=white_button_bg, fg=white_button_fg, width=7, height=2, relief=tk.FLAT, borderwidth=0, ) self.white_button_detailed.pack(side=tk.LEFT, padx=3) self.reset_button = tk.Button( master, text="Resetar Sequência", command=self._reset_sequence, width=15, height=2, bg=self.button_bg_std, fg=self.button_fg_std, relief=tk.FLAT, borderwidth=0, ) self.reset_button.pack(pady=10) # Input para o limite máximo de sequências self.max_sequence_limit = tk.IntVar(value=4) # Default value self.max_sequence_limit_label = tk.Label( master, text="Limite Máximo de Sequências (1-10):", bg=self.dark_bg, fg=self.text_fg, ) self.max_sequence_limit_label.pack(pady=(10, 0)) self.max_sequence_limit_spinbox = tk.Spinbox( master, from_=1, to=10, textvariable=self.max_sequence_limit, width=5, bg=self.button_bg_std, fg=self.button_fg_std, command=self._on_max_sequence_limit_change, ) # Update display on change self.max_sequence_limit_spinbox.pack(pady=5) # Labels for Casa Analysis self.casa_analysis_title_label = tk.Label( master, text="Análise 'Casas' (Próximas Cores para Sub-Sequências Atuais):", font=("Arial", 10, "bold"), bg=self.dark_bg, fg=self.text_fg, ) self.casa_analysis_title_label.pack(pady=(10, 0)) self.casa_labels_frames = [] # To store frames for each casa's counts self.casa_labels_vars = [] self._on_mode_change() # Inicializações dependentes de modo # Variáveis e configuração para modo de atualização em tempo real self.stop_event = threading.Event() self.last_timestamp = "1970-01-01T00:00:00.000Z" # Initialize to a very old date self.real_time_var = tk.BooleanVar() self.real_time_var.set(False) # Adicionar para o cliente WebSocket self.auth_token = AUTH_TOKEN_FROM_TRACKER # Usar o token definido globalmente self.real_time_data_queue = queue.Queue() self.real_time_client: Optional[BlazeRealtimeClient] = None self.real_time_thread: Optional[threading.Thread] = None real_time_cb = tk.Checkbutton( master, text="Real-Time API", variable=self.real_time_var, command=self._toggle_real_time_mode, bg=self.dark_bg, fg=self.text_fg, selectcolor=self.frame_bg, ) real_time_cb.pack(pady=5) # ---------- NOVOS MÉTODOS / ALTERAÇÕES PARA LÓGICA DE COR ----------- def _get_accent_for_color(self, color: str) -> str: if color == "Red": return self.red_accent_fg elif color == "Black": return self.black_accent_fg elif color == "White": return self.white_accent_fg return self.text_bold_fg # Fallback # -------------------------------------------------------------------- def _on_max_sequence_limit_change(self): """Called when the max sequence limit spinbox value changes.""" self._setup_casa_labels() # Rebuild casa labels with the new limit new_max_limit = self.max_sequence_limit.get() # Update current_sequence to be a slice of the full history, limited by the new max_limit self.current_sequence = self.full_history_sequence[-new_max_limit:] self._update_display() # Update the display to reflect new analysis def _format_sequence_element(self, color: str, detail: Optional[str]) -> str: if detail: return f"{color}-{detail}" return color def _on_mode_change(self): # Não há mais seleção de modo, então apenas atualiza estados e labels self._update_button_states() self._setup_next_color_labels() # Rebuild next color labels for the new mode self._setup_casa_labels() # Rebuild casa labels for the new mode self._reset_sequence() # Resets current sequence and updates display def _update_button_states(self): # Sempre opera no modo detalhado is_disabled = self.disable_buttons self.detailed_button_frame.pack(pady=5) self.red_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.red_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_M_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.black_m_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.white_button_detailed.config(state=tk.DISABLED if is_disabled else tk.NORMAL) self.reset_button.config(state=tk.DISABLED if is_disabled else tk.NORMAL) def _setup_next_color_labels(self): # Clear existing labels in the frame for widget in self.next_color_display_frame.winfo_children(): widget.destroy() # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg # Sempre configura para o modo detalhado self.next_red_M_label_text = tk.StringVar(value="Red-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_M_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_red_m_label_text = tk.StringVar(value="Red-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_red_m_label_text, font=("Arial", 10), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_M_label_text = tk.StringVar(value="Black-M: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_M_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_black_m_label_text = tk.StringVar(value="Black-m: 0") tk.Label( self.next_color_display_frame, textvariable=self.next_black_m_label_text, font=("Arial", 10), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) self.next_white_label_text = tk.StringVar(value="White: 0") # Re-use for White tk.Label( self.next_color_display_frame, textvariable=self.next_white_label_text, font=("Arial", 10), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=5) def _setup_casa_labels(self): # Clear existing frames and vars for frame in self.casa_labels_frames: frame.destroy() self.casa_labels_frames = [] self.casa_labels_vars = [] # Use instance variables for colors dark_bg = self.dark_bg frame_bg = self.frame_bg red_accent_fg = self.red_accent_fg black_accent_fg = self.black_accent_fg white_accent_fg = self.white_accent_fg max_limit = self.max_sequence_limit.get() for n_casa in range(1, max_limit + 1): casa_outer_frame = tk.Frame(self.master, bg=frame_bg) casa_outer_frame.pack(pady=2, fill=tk.X) self.casa_labels_frames.append(casa_outer_frame) seq_var = tk.StringVar() seq_var.set(f"Casa {n_casa} (Seq: ...):") seq_label = tk.Label( casa_outer_frame, textvariable=seq_var, font=("Arial", 9), bg=frame_bg, fg=self.text_fg, ) seq_label.pack(side=tk.TOP, anchor="w", padx=10) counts_frame = tk.Frame(casa_outer_frame, bg=frame_bg) counts_frame.pack(side=tk.TOP, anchor="w", padx=10) red_M_var = tk.StringVar(value="R-M: -") red_m_var = tk.StringVar(value="R-m: -") black_M_var = tk.StringVar(value="B-M: -") black_m_var = tk.StringVar(value="B-m: -") white_var_detailed = tk.StringVar(value="White: -") tk.Label( counts_frame, textvariable=red_M_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=red_m_var, font=("Arial", 9), fg=red_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_M_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=black_m_var, font=("Arial", 9), fg=black_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) tk.Label( counts_frame, textvariable=white_var_detailed, font=("Arial", 9), fg=white_accent_fg, bg=frame_bg, ).pack(side=tk.LEFT, padx=3) self.casa_labels_vars.append( { "seq": seq_var, "detailed_red_M": red_M_var, "detailed_red_m": red_m_var, "detailed_black_M": black_M_var, "detailed_black_m": black_m_var, "shared_white": white_var_detailed, } ) def _load_winning_colors(self): self.winning_colors_from_csv = [] try: with open( self.csv_filepath, mode="r", newline="", encoding="utf-8" ) as file: reader = csv.DictReader(file) if ( not reader.fieldnames or "color" not in reader.fieldnames or "total_red" not in reader.fieldnames or "total_black" not in reader.fieldnames ): print(f"Erro: Cabeçalhos necessários não encontrados em {self.csv_filepath}") return for row in reader: color_part_full = row.get("color", "") if not color_part_full: continue color_part = color_part_full.split(" ")[0] detail = None if color_part in ["Red", "Black", "White"]: if color_part != "White": # Only Red and Black can have M/m try: total_red = float(row.get("total_red", "0")) total_black = float(row.get("total_black", "0")) if color_part == "Red": if total_red > total_black: detail = "M" elif total_red < total_black: detail = "m" elif color_part == "Black": if total_black > total_red: detail = "M" elif total_black < total_red: detail = "m" except ValueError: print( f"Aviso: Valor não numérico para total_red/total_black na linha: {row}" ) self.winning_colors_from_csv.append((color_part, detail)) except FileNotFoundError: print(f"Erro: Arquivo não encontrado em {self.csv_filepath}") except Exception as e: print(f"Erro ao ler o CSV: {e}") # ------------------------- CASA ANALYSIS ---------------------------- def _get_next_colors_for_specific_sequence( self, sequence_to_find: List[Tuple[str, Optional[str]]] ) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not sequence_to_find or not self.winning_colors_from_csv: return next_counts len_sub = len(sequence_to_find) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == sequence_to_find: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_casa_analysis_display(self): max_limit = self.max_sequence_limit.get() for n_idx in range(max_limit): N = n_idx + 1 if n_idx >= len(self.casa_labels_vars): break vars_for_casa = self.casa_labels_vars[n_idx] current_seq_len = len(self.current_sequence) seq_display_text = "N/A" if self.disable_buttons or not self.winning_colors_from_csv: pass elif not self.current_sequence: seq_display_text = "-" elif current_seq_len < N: seq_display_text = "Insuficiente" else: target_sequence_for_casa = self.current_sequence[:N] sequence_str_parts = [ self._format_sequence_element(c, d) for c, d in target_sequence_for_casa ] sequence_str = " -> ".join(sequence_str_parts) max_display_len = 25 if len(sequence_str) > max_display_len: sequence_str = sequence_str[: max_display_len - 3] + "..." seq_display_text = sequence_str vars_for_casa["seq"].set(f"Casa {N} (Sub: {seq_display_text})") if ( self.disable_buttons or not self.winning_colors_from_csv or current_seq_len < N ): vars_for_casa["detailed_red_M"].set("R-M: -") vars_for_casa["detailed_red_m"].set("R-m: -") vars_for_casa["detailed_black_M"].set("B-M: -") vars_for_casa["detailed_black_m"].set("B-m: -") vars_for_casa["shared_white"].set("White: -") continue next_color_counts_detailed = self._get_next_colors_for_specific_sequence( self.current_sequence[:N] ) vars_for_casa["detailed_red_M"].set( f"R-M: {next_color_counts_detailed.get(('Red', 'M'), 0)}" ) vars_for_casa["detailed_red_m"].set( f"R-m: {next_color_counts_detailed.get(('Red', 'm'), 0)}" ) vars_for_casa["detailed_black_M"].set( f"B-M: {next_color_counts_detailed.get(('Black', 'M'), 0)}" ) vars_for_casa["detailed_black_m"].set( f"B-m: {next_color_counts_detailed.get(('Black', 'm'), 0)}" ) vars_for_casa["shared_white"].set( f"White: {next_color_counts_detailed.get(('White', None), 0)}" ) # ------------------- PROVÁVEL PRÓXIMA COR --------------------------- def _determine_most_probable_next_color( self, ) -> Tuple[Optional[str], Optional[str]]: if not self.current_sequence or self.disable_buttons or not self.winning_colors_from_csv: return None, None detailed_next_counts = self._count_next_color_occurrences() if not detailed_next_counts: return None, None max_occurrence = max(detailed_next_counts.values()) if max_occurrence == 0: return None, None probable_detailed_colors = [ cd_tuple for cd_tuple, count in detailed_next_counts.items() if count == max_occurrence ] if probable_detailed_colors: return sorted(probable_detailed_colors, key=lambda x: (x[0], str(x[1])))[0] return None, None # ------------------- SEQUÊNCIA E ADIÇÕES ---------------------------- def _add_to_sequence(self, color: str, detail: Optional[str]): if self.disable_buttons: return # Captura a cor prevista ANTES de adicionar (desconsidera M/m) pred_color, _ = self._determine_most_probable_next_color() # Determina se foi win ou loss is_correct_prediction = pred_color and pred_color == color # Adiciona o novo resultado entry = (color, detail) self.full_history_sequence.append(entry) self.current_sequence = self.full_history_sequence[-self.max_sequence_limit.get() :] # Adiciona à história de previsões se havia uma previsão if pred_color is not None: self.prediction_history.append((pred_color, color, is_correct_prediction)) # Decide a cor de exibição para o novo elemento if pred_color and pred_color == color: # Correct prediction - use white for Red/Black, accent for White if color in ["Red", "Black"]: new_element_fg = self.white_accent_fg # Use white color for Red/Black else: new_element_fg = self._get_accent_for_color(color) else: # Incorrect prediction - use darker white for Red/Black, darker accent for White if color in ["Red", "Black"]: new_element_fg = self._darken_color(self.white_accent_fg) # Darker white for Red/Black else: new_element_fg = self._darken_color(self._get_accent_for_color(color)) # Store the color for this specific element if not hasattr(self, 'sequence_element_colors'): self.sequence_element_colors = [] self.sequence_element_colors.append(new_element_fg) # Keep only colors for current sequence length max_limit = self.max_sequence_limit.get() if len(self.sequence_element_colors) > max_limit: self.sequence_element_colors = self.sequence_element_colors[-max_limit:] # Verificar sequências consecutivas de wins ou losses if len(self.prediction_history) >= max_limit: # Pegar os últimos max_limit resultados recent_predictions = self.prediction_history[-max_limit:] # Verificar se todos são wins ou todos são losses all_wins = all(result[2] for result in recent_predictions) # result[2] é is_correct_prediction all_losses = all(not result[2] for result in recent_predictions) # Tocar notificação se todos foram wins OU todos foram losses if all_wins or all_losses: try: winsound.PlaySound( r"C:\Users\lucas\Downloads\SOURCE\models\noti.wav", winsound.SND_FILENAME | winsound.SND_ASYNC ) result_type = "wins" if all_wins else "losses" print(f"Notificação: Sequência de {max_limit} {result_type} consecutivos!") # Debug: mostrar a sequência que gerou a notificação debug_sequence = [f"{pred}->{real}{'✓' if win else '✗'}" for pred, real, win in recent_predictions] print(f"Sequência: {' | '.join(debug_sequence)}") except Exception as e: print(f"Erro ao tocar notificação sonora: {e}") self._update_display() def _reset_sequence(self): self.current_sequence = [] self.full_history_sequence = [] self.sequence_element_colors = [] # Reset colors too self.prediction_history = [] # Reset prediction history too self._update_display() def _count_sequence_occurrences(self) -> int: if not self.current_sequence or not self.winning_colors_from_csv: return 0 count = 0 len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main < len_sub: return 0 for i in range(len_main - len_sub + 1): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: count += 1 return count def _count_next_color_occurrences(self) -> Dict[Tuple[str, Optional[str]], int]: next_counts: Dict[Tuple[str, Optional[str]], int] = {} if not self.current_sequence or not self.winning_colors_from_csv: return next_counts len_sub = len(self.current_sequence) len_main = len(self.winning_colors_from_csv) if len_main <= len_sub: return next_counts for i in range(len_main - len_sub): if self.winning_colors_from_csv[i : i + len_sub] == self.current_sequence: next_color_tuple = self.winning_colors_from_csv[i + len_sub] next_counts[next_color_tuple] = next_counts.get(next_color_tuple, 0) + 1 return next_counts def _update_display(self): # Clear existing sequence element labels for label in self.sequence_element_labels: label.destroy() self.sequence_element_labels = [] if self.current_sequence: # Create individual labels for each sequence element for i, (color, detail) in enumerate(self.current_sequence): element_text = self._format_sequence_element(color, detail) # Get stored color for this element, or default to white for Red/Black if hasattr(self, 'sequence_element_colors') and i < len(self.sequence_element_colors): element_color = self.sequence_element_colors[i] else: # Default color - use white for Red/Black, accent for White if color in ["Red", "Black"]: element_color = self.white_accent_fg else: element_color = self._get_accent_for_color(color) # Add arrow if not the first element if i > 0: arrow_label = tk.Label( self.sequence_elements_frame, text=" -> ", font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_fg, ) arrow_label.pack(side=tk.LEFT) self.sequence_element_labels.append(arrow_label) # Create label for this element element_label = tk.Label( self.sequence_elements_frame, text=element_text, font=("Arial", 12, "bold"), bg=self.frame_bg, fg=element_color, ) element_label.pack(side=tk.LEFT) self.sequence_element_labels.append(element_label) else: # Show "Nenhuma" when no sequence none_label = tk.Label( self.sequence_elements_frame, text="Nenhuma", font=("Arial", 12, "bold"), bg=self.frame_bg, fg=self.text_bold_fg, ) none_label.pack(side=tk.LEFT) self.sequence_element_labels.append(none_label) occurrences = self._count_sequence_occurrences() self.count_label_text.set(f"{occurrences}") detailed_next_counts = self._count_next_color_occurrences() if hasattr(self, "next_red_M_label_text"): self.next_red_M_label_text.set( f"Red-M: {detailed_next_counts.get(('Red', 'M'), 0)}" ) if hasattr(self, "next_red_m_label_text"): self.next_red_m_label_text.set( f"Red-m: {detailed_next_counts.get(('Red', 'm'), 0)}" ) if hasattr(self, "next_black_M_label_text"): self.next_black_M_label_text.set( f"Black-M: {detailed_next_counts.get(('Black', 'M'), 0)}" ) if hasattr(self, "next_black_m_label_text"): self.next_black_m_label_text.set( f"Black-m: {detailed_next_counts.get(('Black', 'm'), 0)}" ) if hasattr(self, "next_white_label_text"): self.next_white_label_text.set( f"White: {detailed_next_counts.get(('White', None), 0)}" ) prob_color, prob_detail = self._determine_most_probable_next_color() if prob_color: self.probable_next_color_text_var.set( self._format_sequence_element(prob_color, prob_detail) ) self.probable_next_color_label.config( fg=self._get_accent_for_color(prob_color) ) else: self.probable_next_color_text_var.set("-") self.probable_next_color_label.config(fg=self.text_fg) self._update_casa_analysis_display() # ----------------- REAL-TIME PROCESSING ----------------------------- def _toggle_real_time_mode(self): if self.real_time_var.get(): # Se está ativando o modo real-time if not self.real_time_thread or not self.real_time_thread.is_alive(): print("Iniciando cliente Real-Time API...") if "1752950579" in self.auth_token: print( "ALERTA: O token de autenticação pode estar expirado. A API Real-Time pode não funcionar." ) self.stop_event.clear() self.real_time_client = BlazeRealtimeClient( self.auth_token, self.real_time_data_queue, self.stop_event ) self.real_time_thread = threading.Thread( target=self.real_time_client.start, daemon=True ) self.real_time_thread.start() self.master.after(100, self._process_real_time_queue) else: print("Parando cliente Real-Time API...") self.stop_event.set() if self.real_time_thread and self.real_time_thread.is_alive(): self.real_time_thread.join(timeout=5) if self.real_time_thread.is_alive(): print("Realtime: Thread do cliente não terminou no tempo esperado.") self.real_time_client = None def _process_real_time_queue(self): if not self.real_time_var.get(): return try: while not self.real_time_data_queue.empty(): new_result = self.real_time_data_queue.get_nowait() if new_result["created_at"] > self.last_timestamp: self.last_timestamp = new_result["created_at"] color_from_api = new_result["color"] detail_from_api = new_result["detail"] print( f"Realtime: Novo resultado recebido - Cor: {color_from_api}, Detalhe: {detail_from_api}, Roll: {new_result.get('roll')}" ) self._add_to_sequence(color_from_api, detail_from_api) except queue.Empty: pass except Exception as e: print(f"Erro ao processar fila de tempo real: {e}") finally: if self.real_time_var.get(): self.master.after(200, self._process_real_time_queue) # Ponto de entrada principal da aplicação if __name__ == "__main__": root = tk.Tk() csv_file_path = "data.csv" app = SequenceCounterApp(root, csv_file_path) def on_closing(): print("Fechando aplicação...") if app.real_time_thread and app.real_time_thread.is_alive(): print("Parando thread de real-time...") app.stop_event.set() app.real_time_thread.join(timeout=5) if app.real_time_thread.is_alive(): print("Timeout ao esperar a thread de real-time terminar.") if ( app.real_time_client and app.real_time_client.loop and not app.real_time_client.loop.is_closed() ): print("Fechando loop asyncio do cliente WebSocket...") if app.real_time_client.sio and app.real_time_client.sio.connected: app.real_time_client.loop.run_until_complete( app.real_time_client.sio.disconnect() ) app.real_time_client.loop.call_soon_threadsafe( app.real_time_client.loop.stop ) root.destroy() print("Aplicação fechada.") root.protocol("WM_DELETE_WINDOW", on_closing) try: root.mainloop() except KeyboardInterrupt: print("\nInterrupção pelo teclado recebida. Fechando...") on_closing()
Abaixo está uma versão lite (dois-terços menor) em um único arquivo, preservando todas as funcionalidades — GUI Tkinter em dark-theme, WebSocket Realtime (socket.io), áudio de notificação e o mesmo fluxo de análise de sequências/casas.
Salve como sequence_analyzer_lite.py
, mantenha data.csv
no mesmo diretório e garanta que o noti.wav
continue no caminho indicado.
python# sequence_analyzer_lite.py # Requer: python-3.10+, tkinter, python-socketio[client] import tkinter as tk, csv, threading, time, queue, asyncio, winsound import socketio from typing import * # --- CONSTANTES --------------------------------------------------------- AUTH_TOKEN = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." "eyJpZCI6MjY3NzMsImlzUmVmcmVzaFRva2VuIjpmYWxzZSwiYmxvY2tzIjpbXSwidXVpZCI6IjU0ODA0N2QxLWFjN2QtNGJiMS04" "MTIwLTA3OTE3ZjQzMjk4OSIsImlhdCI6MTc0Nzc2NjU3OSwiZXhwIjoxNzUyOTUwNTc5fQ." "Vmfe7JqHtx9QR-qPv4pHvCYbioh2SnQ4YEIK9uHWtx8") ROOM, WS_URL, WS_PATH = "double_room_1", "wss://api-gaming.blaze.bet.br", "/replication/" AUDIO_WAV = r"C:\Users\lucas\Downloads\SOURCE\models\noti.wav" # ------------------------------------------------------------------------ class BlazeClient: def __init__(self, tok: str, q: queue.Queue, stop: threading.Event): self.q, self.stop = q, stop self.sio = socketio.AsyncClient(logger=False, engineio_logger=False) self.auth, self.sub = False, False self.sio.on("connect", self.on_connect) self.sio.on("disconnect", lambda *_: self._reset()) self.sio.on("connect_error", lambda *_: self._reset()) self.sio.on("data", self.on_data) self.tok = tok def _reset(self): self.auth = self.sub = False async def on_connect(self): await self.sio.emit("cmd", {"id": "authenticate", "payload": {"token": self.tok}}, callback=self._ack_auth) async def _ack_auth(self, d): # noqa self.auth = d and d[0] == "ok" if isinstance(d, list) else d == "ok" if self.auth: await self.sio.emit("cmd", {"id": "subscribe", "payload": {"room": ROOM}}, callback=lambda a: setattr(self, "sub", a and a[0] == "ok" if isinstance(a, list) else a == "ok")) else: self.stop.set() async def on_data(self, msg): if msg.get("id") != "double.tick": return p = msg.get("payload", {}) if p.get("status") not in ("finished", "complete"): return col_map = {0: "White", 1: "Red", 2: "Black"} color = col_map.get(p.get("color")) if not color: return tr, tb = p.get("total_red_eur_bet", 0), p.get("total_black_eur_bet", 0) detail = None if color == "Red": detail = "M" if tr > tb else "m" if tr < tb else None if color == "Black": detail = "M" if tb > tr else "m" if tb < tr else None self.q.put({"color": color, "detail": detail, "created_at": p["created_at"], "roll": p.get("roll")}) async def loop(self): while not self.stop.is_set(): if not self.sio.connected: try: await self.sio.connect(WS_URL, transports=["websocket"], socketio_path=WS_PATH) except Exception: await asyncio.sleep(5); continue await asyncio.sleep(0.1) if self.sio.connected: await self.sio.disconnect() def start(self): asyncio.run(self.loop()) # ------------------------------------------------------------------------ class App: def __init__(self, master: tk.Tk, csv_path="data.csv"): self.master = master; master.title("Sequence Counter") self.dark, self.txt, self.white = "#2E2E2E", "#E0E0E0", "#FFFFFF" master.configure(bg=self.dark) self.hist: List[Tuple[str, Optional[str]]] = [] # full history self.seq: List[Tuple[str, Optional[str]]] = [] # limited view self.pred_hist: List[Tuple[str, str, bool]] = [] # (pred, real, win?) self.max_n = tk.IntVar(value=4) self.disable = False self.colors = [] # label fg list # ----- dados CSV ------------------------------- self.db: List[Tuple[str, Optional[str]]] = [] self._load_csv(csv_path) if not self.db: self.disable = True # ----- UI -------------------------------------- self._build_ui() # ----- realtime ------------------------------- self.real_time = tk.BooleanVar() self.q = queue.Queue(); self.stop_ev = threading.Event() self.rt_thread: Optional[threading.Thread] = None self.last_ts = "1970-01-01T00:00:00.000Z" # ------------------ UI helpers -------------------- def _build_ui(self): fg = self.txt tk.Label(self.master, text="Clique para formar sequência:", bg=self.dark, fg=fg).pack(pady=5) top = tk.Frame(self.master, bg=self.dark); top.pack() self.seq_frame = tk.Frame(top, bg=self.dark); self.seq_frame.pack(side=tk.LEFT) tk.Label(top, text=" Provável:", bg=self.dark, fg=fg).pack(side=tk.LEFT) self.next_var = tk.StringVar(value="-") self.next_lbl = tk.Label(top, textvariable=self.next_var, font=("Arial", 12, "bold"), bg=self.dark, fg=self.white); self.next_lbl.pack(side=tk.LEFT) # counts self.count_var = tk.StringVar(value="0") tk.Label(self.master, text="Ocorrências:", bg=self.dark, fg=fg).pack() tk.Label(self.master, textvariable=self.count_var, font=("Arial", 12, "bold"), bg=self.dark, fg=self.white).pack() # next color counts self.next_counts = {k: tk.StringVar(value=f"{k}: 0") for k in ("Red-M", "Red-m", "Black-M", "Black-m", "White")} nc_frame = tk.Frame(self.master, bg=self.dark); nc_frame.pack(pady=5) for v in self.next_counts.values(): tk.Label(nc_frame, textvariable=v, bg=self.dark, fg=fg).pack(side=tk.LEFT, padx=4) # casas self.casa_vars: List[Dict[str, tk.StringVar]] = [] self._refresh_casas() # buttons btn_frame = tk.Frame(self.master, bg=self.dark); btn_frame.pack(pady=8) for (c,d,bg) in [("Red","M","#B80000"),("Red","m","#B80000"), ("Black","M","#333333"),("Black","m","#333333"), ("White",None,"#555555")]: tk.Button(btn_frame, text=f"{c}{'-'+d if d else ''}", bg=bg, fg="white", width=7, height=2, relief=tk.FLAT, command=lambda C=c, D=d: self._add(C,D)).pack(side=tk.LEFT, padx=3) tk.Button(self.master, text="Resetar", command=self._reset, width=12, bg="#4A4A4A", fg=fg, relief=tk.FLAT).pack(pady=8) tk.Label(self.master, text="Limite 1-10:", bg=self.dark, fg=fg).pack() tk.Spinbox(self.master, from_=1, to=10, textvariable=self.max_n, width=4, command=self._limit_changed, bg="#4A4A4A", fg=fg).pack(pady=3) tk.Checkbutton(self.master, text="Real-Time API", variable=self.real_time, command=self._toggle_rt, bg=self.dark, fg=fg, selectcolor=self.dark).pack() def _refresh_casas(self): for child in getattr(self,"casa_frames",[]): child.destroy() self.casa_frames, self.casa_vars = [], [] for n in range(1, self.max_n.get()+1): fr = tk.Frame(self.master, bg=self.dark); fr.pack(anchor="w") seq_v = tk.StringVar(value=f"Casa {n}: -") tk.Label(fr, textvariable=seq_v, bg=self.dark, fg=self.txt).pack(side=tk.LEFT) counts = {k: tk.StringVar(value=f"{k}: -") for k in ("R-M","R-m","B-M","B-m","W")} tk.Frame(fr, bg=self.dark).pack() # spacer inner = tk.Frame(fr, bg=self.dark); inner.pack(anchor="w") for v in counts.values(): tk.Label(inner, textvariable=v, bg=self.dark, fg=self.txt).pack(side=tk.LEFT, padx=2) self.casa_frames.append(fr); self.casa_vars.append({"seq":seq_v, **counts}) # ------------------ CSV --------------------------- def _load_csv(self, path): try: with open(path, encoding="utf-8") as f: r = csv.DictReader(f) for row in r: col = row["color"].split()[0] tr,tb = float(row["total_red"]), float(row["total_black"]) det = None if col=="Red": det = "M" if tr>tb else "m" if tr<tb else None if col=="Black": det = "M" if tb>tr else "m" if tb<tr else None self.db.append((col,det)) except Exception as e: print("Erro CSV:",e) # ------------------ lógica ------------------------ def _format(self,c,d): return f"{c}-{d}" if d else c def _prob_next(self)->Tuple[Optional[str],Optional[str]]: if not self.seq or not self.db: return None,None counts=self._next_counts(); mx=max(counts.values(), default=0) if not mx: return None,None cand=[cd for cd,v in counts.items() if v==mx] return sorted(cand,key=lambda x:(x[0], str(x[1])))[0] def _next_counts(self, sub=None): sub = sub or self.seq counts,pos,lsub = {},0,len(sub) while pos+ lsub < len(self.db): if self.db[pos:pos+lsub]==sub: nxt=self.db[pos+lsub]; counts[nxt]=counts.get(nxt,0)+1 pos+=1 return counts def _occurrences(self)->int: if not self.seq: return 0 cnt,pos,lsub=0,0,len(self.seq) while pos+lsub<=len(self.db): cnt+=self.db[pos:pos+lsub]==self.seq; pos+=1 return cnt # ------------------ ações ------------------------- def _add(self,color,detail): if self.disable: return pred,_ = self._prob_next() win = pred==color if pred else None self.hist.append((color,detail)) self.seq = self.hist[-self.max_n.get():] if pred is not None: self.pred_hist.append((pred,color,win)) self._check_streak() self.colors.append(self.white if win else "#AAAAAA") self.colors = self.colors[-self.max_n.get():] self._update_ui() def _reset(self): self.seq=self.hist=self.pred_hist=self.colors=[] self._update_ui() def _limit_changed(self): self.seq = self.hist[-self.max_n.get():] self._refresh_casas(); self._update_ui() # ------------- streak + áudio --------------------- def _check_streak(self): n=self.max_n.get() if len(self.pred_hist)<n: return last=self.pred_hist[-n:]; wins=[w for _,_,w in last] if all(wins) or all(not w for w in wins): try: winsound.PlaySound(AUDIO_WAV, winsound.SND_FILENAME|winsound.SND_ASYNC) except Exception as e: print("Audio error:",e) # ------------- UI atualização --------------------- def _update_ui(self): for w in self.seq_frame.winfo_children(): w.destroy() if not self.seq: tk.Label(self.seq_frame, text="Nenhuma", bg=self.dark, fg=self.white).pack() else: for i,(c,d) in enumerate(self.seq): if i: tk.Label(self.seq_frame, text="→", bg=self.dark, fg=self.txt).pack(side=tk.LEFT) fg=self.colors[i] if i<len(self.colors) else self.white tk.Label(self.seq_frame, text=self._format(c,d), bg=self.dark, fg=fg, font=("Arial",12,"bold")).pack(side=tk.LEFT) # counts self.count_var.set(str(self._occurrences())) nx=self._next_counts() self.next_counts["Red-M"].set(f"Red-M: {nx.get(('Red','M'),0)}") self.next_counts["Red-m"].set(f"Red-m: {nx.get(('Red','m'),0)}") self.next_counts["Black-M"].set(f"Black-M: {nx.get(('Black','M'),0)}") self.next_counts["Black-m"].set(f"Black-m: {nx.get(('Black','m'),0)}") self.next_counts["White"].set(f"White: {nx.get(('White',None),0)}") # provável pc,pd=self._prob_next() if pc: self.next_var.set(self._format(pc,pd)); self.next_lbl.config(fg=self.white) else: self.next_var.set("-"); self.next_lbl.config(fg=self.txt) # casas for idx,cv in enumerate(self.casa_vars): n=idx+1 if len(self.seq)<n: cv["seq"].set(f"Casa {n}: Insuf.") for k in ("R-M","R-m","B-M","B-m","W"): cv[k].set(f"{k}: -") continue sub=self.seq[:n]; cv["seq"].set(f"Casa {n}: "+'→'.join(self._format(*x) for x in sub)) nc=self._next_counts(sub) cv["R-M"].set(f"R-M:{nc.get(('Red','M'),0)}") cv["R-m"].set(f"R-m:{nc.get(('Red','m'),0)}") cv["B-M"].set(f"B-M:{nc.get(('Black','M'),0)}") cv["B-m"].set(f"B-m:{nc.get(('Black','m'),0)}") cv["W"].set(f"W:{nc.get(('White',None),0)}") # ---------------- realtime ------------------------ def _toggle_rt(self): if self.real_time.get(): self.stop_ev.clear() self.rt_thread = threading.Thread(target=self._rt_start, daemon=True) self.rt_thread.start(); self.master.after(200,self._rt_loop) else: self.stop_ev.set(); self.rt_thread and self.rt_thread.join() def _rt_start(self): BlazeClient(AUTH_TOKEN, self.q, self.stop_ev).start() def _rt_loop(self): try: while not self.q.empty(): d=self.q.get_nowait() if d["created_at"]>self.last_ts: self.last_ts=d["created_at"]; self._add(d["color"], d["detail"]) except queue.Empty: pass finally: if self.real_time.get(): self.master.after(200,self._rt_loop) # ------------------------------------------------------------------------ if __name__ == "__main__": root=tk.Tk(); App(root); root.mainloop()
Bom uso!