윈도우 키보드 후킹
생성일: 2025년 2월 4일
답변에 사용된 모델 O3 Mini High by Chat01
생성일: 2025년 2월 4일
답변에 사용된 모델 O3 Mini High by Chat01
import ctypes
import sys
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import pandas as pd
import json
from io import StringIO
import screeninfo
import requests
from threading import Thread, Lock
from PIL import Image, ImageTk
import os
import time
import logging
from queue import Queue
import socket
import functools
import math
import win32con
from ctypes import windll, CFUNCTYPE, POINTER, c_int, c_void_p, byref, c_long, cast
from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPARAM, WPARAM, LONG
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('PIL').setLevel(logging.WARNING)
class KBDLLHOOKSTRUCT(ctypes.Structure):
fields = [
("vkCode", DWORD),
("scanCode", DWORD),
("flags", DWORD),
("time", DWORD),
("dwExtraInfo", POINTER(c_void_p))
]
LowLevelKeyboardProc = CFUNCTYPE(LONG, c_int, WPARAM, LPARAM)
class KeyboardHook:
"""
Windows API 저수준 키보드 후킹 클래스
- F1~F24 키에 대해 후킹하여 controller의 on_press/on_release 이벤트 호출
"""
def init(self, controller):
self.controller = controller
self.user32 = windll.user32
self.kernel32 = windll.kernel32
self.hooked = None
self.hInstance = self.kernel32.GetModuleHandleW(None)
self.function_key_vkcodes = set(range(0x70, 0x88)) # F1 ~ F24 (vkCode 범위)
self.pointer = LowLevelKeyboardProc(self._hook_proc)
self.is_running = False
textdef _hook_proc(self, nCode, wParam, lParam): """ 후킹 콜백 함수 – 키 이벤트 처리 및 시스템 전달 억제 """ try: if nCode == win32con.HC_ACTION: kb = cast(lParam, POINTER(KBDLLHOOKSTRUCT)).contents vk_code = kb.vkCode if vk_code in self.function_key_vkcodes: key_str = f"f{vk_code - 0x6F}" if wParam in (win32con.WM_KEYDOWN, win32con.WM_SYSKEYDOWN): self.controller.on_press(key_str) elif wParam in (win32con.WM_KEYUP, win32con.WM_SYSKEYUP): self.controller.on_release(key_str) return 1 # 이벤트 전달 차단 except Exception as e: self.controller.logger.error(f"키보드 후킹 오류: {e}") return self.user32.CallNextHookEx(self.hooked, nCode, wParam, lParam) def start(self): """후킹 시작""" if self.is_running: return False self.hooked = self.user32.SetWindowsHookExA( win32con.WH_KEYBOARD_LL, self.pointer, self.hInstance, 0 ) if not self.hooked: self.controller.logger.error("키보드 후킹 설정 실패") return False self.is_running = True self.controller.logger.info("키보드 후킹 시작됨") self.hook_thread = Thread(target=self._pump_messages, daemon=True) self.hook_thread.start() return True def _pump_messages(self): """메시지 펌프로 키보드 이벤트 지속 처리""" msg = MSG() while self.is_running: while self.user32.PeekMessageA(byref(msg), None, 0, 0, win32con.PM_REMOVE): self.user32.TranslateMessage(byref(msg)) self.user32.DispatchMessageA(byref(msg)) time.sleep(0.01) def stop(self): """후킹 중지""" if self.is_running and self.hooked: self.user32.UnhookWindowsHookEx(self.hooked) self.is_running = False self.hooked = None self.controller.logger.info("키보드 후킹 중지됨")
class SyncRequestHandler:
def init(self, base_url, max_concurrent_requests=5, request_timeout=5):
self.base_url = base_url
self.queue = Queue()
self.max_concurrent_requests = max_concurrent_requests
self.workers = []
self.session = requests.Session()
self.request_timeout = request_timeout
self.lock = Lock()
self.last_request_time = {}
self.throttle_interval = 0.05 # 50ms 쓰로틀링
self._start_workers()
textdef _start_workers(self): for _ in range(self.max_concurrent_requests): worker = Thread(target=self._worker_thread, daemon=True) worker.start() self.workers.append(worker) def _worker_thread(self): while True: command = self.queue.get() if command is None: break # 종료 신호 try: self._send_request(command) except Exception as e: logging.error(f"워커 쓰레드 에러: {e}") finally: self.queue.task_done() def _send_request(self, command): url = f"{self.base_url}/api/?Function={command}" current_time = time.time() with self.lock: if command in self.last_request_time: if current_time - self.last_request_time[command] < self.throttle_interval: logging.debug(f"쓰로틀링: {command}") return self.last_request_time[command] = current_time try: response = self.session.get(url, timeout=self.request_timeout) response.raise_for_status() logging.info(f"요청 성공: {url} - 상태 코드: {response.status_code}") except requests.exceptions.Timeout: logging.error(f"요청 타임아웃: {url}") except requests.exceptions.ConnectionError: logging.error(f"연결 에러: {url}") except requests.exceptions.RequestException as e: logging.error(f"요청 에러: {url} - {e}") def send_request(self, command): self.queue.put(command) def close(self): for _ in self.workers: self.queue.put(None) for worker in self.workers: worker.join() self.session.close()
class ReplayController:
def init(self):
self.logger = logging.getLogger(name)
self.root = tk.Tk()
self.root.title("최반장 리플레이 컨트롤러 간소판 v1.22 - 안정화")
self._setup_icon()
self._initialize_variables()
self._setup_dark_theme()
self._create_frames()
# 기본 설정 프레임에 vMix 상태 표시기 생성
self.vmix_status_indicator = tk.Canvas(
self.frames["기본 설정"],
width=14,
height=14,
bg=self.frame_bg_color,
highlightthickness=0,
borderwidth=0
)
self._create_widgets()
self._create_notebook()
self._setup_keyboard_hook()
self._setup_request_handler()
self._start_vmix_check_loop()
self._load_settings()
self.root.geometry("870x800")
self.root.minsize(870, 800)
self.root.resizable(True, True)
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(4, weight=1)
text# ----- 아이콘 설정 ----- def _setup_icon(self): script_dir = os.path.dirname(os.path.abspath(__file__)) icon_path = os.path.join(script_dir, "chobanjangTV_icon.ico") if os.path.exists(icon_path): try: self.root.iconbitmap(icon_path) self.icon_path = icon_path self.logger.info("아이콘 설정 성공") except Exception as e: self.logger.warning(f"아이콘 설정 실패: {e}") else: self.logger.warning(f"아이콘 파일({icon_path})을 찾을 수 없습니다.") # ----- 변수 초기화 ----- def _initialize_variables(self): self.jog_sensitivity_var = tk.DoubleVar(value=5) self.jog_rotation_count = 0 self.last_jog_time = 0 self.jog_direction = None self.jog_speed_levels = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] self.request_handler = None self.vmix_connected = False self.last_vmix_check = 0 self.vmix_check_interval = 5.0 self.overlay_window = None self.overlay_window_open = False self.key_press_buffer = [] self.selected_tags = [] self.current_sheet = 0 self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.overlay_toggle = False self.last_press_time = {} self.double_tap_commands = {} self.replay_commands = {} self.pressed_keys = set() self.key_press_time = {} self.key_release_time = {} self.first_key = None self.first_key_time = None self.overlay_key_timer = None self.notebook_clone = None self.overlay_label = None self.overlay_close_time = 0 self.end_time_vars = [tk.StringVar() for _ in range(5)] self.camera_count_var = tk.StringVar() self.overlay_on_off_var = tk.BooleanVar(value=True) self.overlay_monitor_var = tk.StringVar() self.tag_sheet_count_var = tk.StringVar() self.tag_sheet_count_var.trace_add("write", self.update_sheet_activation) self.sheets = [] self.entries = [] # ----- 다크 테마 설정 ----- def _setup_dark_theme(self): self.bg_color = '#1e1e1e' self.fg_color = '#ffffff' self.accent_color = '#007acc' self.entry_bg_color = '#1a1a1a' self.button_bg_color = '#3c3c3c' self.frame_bg_color = '#252526' self.root.configure(bg=self.bg_color) self.style = ttk.Style() self.style.theme_create("darktheme", parent="alt", settings={ "TNotebook": {"configure": {"background": self.bg_color, "tabmargins": [2, 5, 2, 0]}}, "TNotebook.Tab": {"configure": {"padding": [20, 10], "background": self.bg_color, "foreground": self.fg_color}, "map": {"background": [("selected", self.accent_color)], "expand": [("selected", [1, 1, 1, 0])]}}, "TFrame": {"configure": {"background": self.frame_bg_color}}, "TLabel": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, "TEntry": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color, "insertcolor": self.fg_color}}, "TButton": {"configure": {"background": self.button_bg_color, "foreground": self.fg_color}}, "TCombobox": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color}}, "TCheckbutton": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, }) self.style.theme_use("darktheme") self.style.configure('TNotebook.Tab', padding=[20, 10], font=('Arial', 12)) self.style.map('TNotebook.Tab', expand=[('selected', [1, 1, 1, 0])], font=[('selected', ('Arial', 12, 'bold'))]) # ----- 아이콘 로드 (잠금/해제) ----- def _load_icons(self): script_dir = os.path.dirname(os.path.abspath(__file__)) unlocked_path = os.path.join(script_dir, "unlocked.png") locked_path = os.path.join(script_dir, "locked.png") try: if os.path.exists(unlocked_path): self.unlocked_image = ImageTk.PhotoImage(Image.open(unlocked_path).resize((16, 16))) else: self.unlocked_image = None self.logger.warning(f"unlocked.png 파일을 찾을 수 없습니다: {unlocked_path}") if os.path.exists(locked_path): self.locked_image = ImageTk.PhotoImage(Image.open(locked_path).resize((16, 16))) else: self.locked_image = None self.logger.warning(f"locked.png 파일을 찾을 수 없습니다: {locked_path}") except Exception as e: self.logger.error(f"아이콘 로드 실패: {e}") self.unlocked_image = None self.locked_image = None # ----- 프레임 생성 ----- def _create_frames(self): self.frames = {} frame_texts = ["기본 설정", "조그 설정", "오버레이 설정", "TAG 시스템"] for i, text in enumerate(frame_texts): frame = ttk.Frame(self.root) frame.grid(row=i, column=0, sticky="nsew", padx=5, pady=5) frame.configure(style='TFrame') label = tk.Label(frame, text=text, font=('Arial', 12, 'bold'), bg=self.accent_color, fg=self.fg_color) label.pack(fill='x', pady=(0, 5)) self.frames[text] = frame # ----- 위젯 생성 및 배치 ----- def _create_widgets(self): # 기본 설정 프레임 basic_frame = self.frames["기본 설정"] ip_frame = ttk.Frame(basic_frame) ip_frame.pack(fill='x', padx=10, pady=5) self.ip_entry = self._create_label_entry( ip_frame, "아이피", "vmix 리플레이가 실행되는 아이피와 포트 주소", "127.0.0.1:8088", self._check_ip ) # 상태 표시기 배치 self.vmix_status_indicator.pack(side='left', padx=(5, 0), in_=ip_frame) self._update_vmix_status_indicator(False) # 끝 점 시간 설정 콤보박스 end_time_defaults = [5, 10, 20, 30, 40] for i in range(5): self._create_label_combobox( basic_frame, f"끝 점 시간 설정 {i+1} (초)", f"끝 점(outpoint) 마크시 시간 설정 {i+1}", [str(j) for j in range(1, 61)], str(end_time_defaults[i]), self.end_time_vars[i] ) # 카메라 수 설정 콤보박스 self._create_label_combobox( basic_frame, "카메라 수", "리플레이에 쓸 카메라 수 설정. 사용하지 않는 카메라는 실수 방지를 위해 비활성화됩니다.", [str(i) for i in range(1, 5)], "4", self.camera_count_var ) # 조그 설정 프레임에 민감도 슬라이더 self._create_jog_sensitivity_slider(self.frames["조그 설정"]) # 오버레이 설정 overlay_frame = self.frames["오버레이 설정"] self._create_label_checkbox( overlay_frame, "오버레이 on/off", "리플레이 정보 오버레이를 활성화/비활성화 합니다.", self.overlay_on_off_var ) self.overlay_monitor_combobox = self._create_label_combobox( overlay_frame, "오버레이 모니터 번호", f"오버레이가 표시될 모니터 번호 (현재 모니터 갯수: {self._get_monitor_count()})", [str(i) for i in range(1, self._get_monitor_count()+1)], "1", self.overlay_monitor_var ) # TAG 시스템 설정 tag_frame = self.frames["TAG 시스템"] self.tag_sheet_count_combobox = self._create_label_combobox( tag_frame, "태그 시스템 사용 시트수", "사용할 태그 시스템 시트수를 조정합니다.", ["1", "2", "3"], "3", self.tag_sheet_count_var ) # 하단 버튼 프레임 생성 self._create_button_frame() # 변수 변경 시 리플레이 명령어 업데이트 for var in self.end_time_vars: var.trace_add("write", self._update_replay_commands) self.camera_count_var.trace_add("write", self._update_replay_commands) self.jog_sensitivity_var.trace_add("write", self._update_replay_commands) # ----- 태그 시스템 노트북 생성 ----- def _create_notebook(self): self.notebook = ttk.Notebook(self.root) self.notebook.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) self._load_icons() for sheet_num in range(1, 4): frame = ttk.Frame(self.notebook) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.add(frame, text=f"시트 {sheet_num}", image=icon, compound="left") else: self.notebook.add(frame, text=f"시트 {sheet_num}") data = pd.DataFrame("", index=range(1, 10), columns=range(1, 10)) self.sheets.append((frame, data)) for frame, data in self.sheets: for i in range(10): frame.grid_columnconfigure(i, weight=1) frame.grid_rowconfigure(i, weight=1) for i in range(1, 10): self._create_cell(frame, 0, i, value=str(i), readonly=True, bold=True) self._create_cell(frame, i, 0, value=str(i), readonly=True, bold=True) for i in range(1, 10): for j in range(1, 10): self._create_cell(frame, i, j) self.notebook.bind("<Double-1>", self._rename_tab) self.update_sheet_activation() # ----- 하단 버튼 프레임 생성 ----- def _create_button_frame(self): button_frame = ttk.Frame(self.root) button_frame.grid(row=5, column=0, sticky="ew", padx=5, pady=5) dev_label = tk.Label(button_frame, text="제작 ", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) dev_label.grid(row=0, column=0, sticky="w", padx=(5,0)) pdb_label = tk.Label(button_frame, text="PDB", font=('Arial', 10), bg=self.bg_color, fg="#a80000") pdb_label.grid(row=0, column=1, sticky="w") omo_label = tk.Label(button_frame, text="오모 (240809 update)", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) omo_label.grid(row=0, column=2, sticky="w") save_button = tk.Button(button_frame, text="저장", command=self._save_to_file, width=10, bg=self.button_bg_color, fg=self.fg_color) save_button.grid(row=0, column=3, sticky="e", padx=5) load_button = tk.Button(button_frame, text="불러오기", command=self._load_from_file, width=10, bg=self.button_bg_color, fg=self.fg_color) load_button.grid(row=0, column=4, sticky="e", padx=5) clear_button = tk.Button(button_frame, text="값 초기화", command=self._clear_all_data, width=10, bg=self.button_bg_color, fg=self.fg_color) clear_button.grid(row=0, column=5, sticky="e", padx=5) # ----- 레이블-엔트리 위젯 생성 ----- def _create_label_entry(self, frame, text, help_text, default, ip_check_func=None): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) entry = tk.Entry(container, width=18, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.pack(side='left') entry.insert(0, default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) if ip_check_func: status_icon = tk.Label(container, text="●", fg="red", bg=self.frame_bg_color) status_icon.pack(side='left', padx=(5, 0)) entry.bind("<FocusOut>", lambda event: ip_check_func(entry.get(), status_icon)) self.root.after(100, lambda: ip_check_func(entry.get(), status_icon)) return entry # ----- 레이블-콤보박스 위젯 생성 ----- def _create_label_combobox(self, frame, text, help_text, values, default, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) combobox = ttk.Combobox(container, values=values, textvariable=var, width=18, style="TCombobox") combobox.pack(side='left') combobox.set(default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return combobox # ----- 레이블-체크박스 위젯 생성 ----- def _create_label_checkbox(self, frame, text, help_text, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) checkbox = tk.Checkbutton(container, variable=var, bg=self.frame_bg_color, activebackground=self.frame_bg_color, selectcolor=self.accent_color, fg=self.fg_color) checkbox.pack(side='left') help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return checkbox # ----- 조그 민감도 슬라이더 생성 ----- def _create_jog_sensitivity_slider(self, frame): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text="조그 가감속 민감도", width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) def update_slider_value(value): rounded_value = round(float(value), 1) self.jog_sensitivity_var.set(rounded_value) value_label.config(text=f"{rounded_value:.1f}") slider = ttk.Scale(container, from_=1.0, to=10.0, orient="horizontal", variable=self.jog_sensitivity_var, length=200, command=update_slider_value) slider.pack(side='left') value_label = tk.Label(container, text=f"{self.jog_sensitivity_var.get():.1f}", width=5, bg=self.frame_bg_color, fg=self.fg_color) value_label.pack(side='left', padx=(5, 0)) help_label = tk.Label(container, text="조그 가감속 민감도를 조절합니다.", fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) # ----- 태그 시트 셀 생성 ----- def _create_cell(self, parent, row, col, value="", readonly=False, bold=False): if row == 0 or col == 0: label = tk.Label(parent, text=value, width=10, relief="flat", borderwidth=1, bg=self.accent_color, fg=self.fg_color, font=('Arial', 10, 'bold')) label.grid(row=row, column=col, sticky="nsew") return label else: entry = tk.Entry(parent, width=10, justify='center', bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.grid(row=row, column=col, sticky="nsew") entry.insert(0, value) if readonly: entry.config(state='readonly') if bold: entry.config(font=('Arial', 10, 'bold')) self.entries.append(entry) return entry # ----- 키보드 후킹 설정 ----- def _setup_keyboard_hook(self): self.keyboard_hook = KeyboardHook(self) if not self.keyboard_hook.start(): self.logger.error("키보드 후킹 설정 실패") else: self.logger.info("키보드 후킹 설정 성공") # ----- 키 이벤트 처리 (on_press / on_release) ----- def on_press(self, key_str): current_time = time.time() self.logger.debug(f"키 눌림: {key_str}") self.key_press_buffer.append((key_str, current_time)) if len(self.key_press_buffer) > 10: self.key_press_buffer.pop(0) if key_str in self.pressed_keys: return self.pressed_keys.add(key_str) self.key_press_time[key_str] = current_time if key_str == 'f7': self._handle_f7_press() elif key_str in ['f8', 'f9']: self._handle_jog_movement(key_str, current_time) if self.overlay_window_open: if key_str == 'f12': self._on_overlay_close() self._send_tag_to_vmix() elif key_str in ['f16', 'f17', 'f18', 'f19', 'f20', 'f21', 'f22', 'f23', 'f24']: self._handle_overlay_key(key_str) else: if key_str in self.replay_commands: self.replay_commands[key_str]() if key_str in self.double_tap_commands: if current_time - self.last_press_time.get(key_str, 0) < 0.3: self.send_request(self.double_tap_commands[key_str]) self.last_press_time[key_str] = current_time def on_release(self, key_str): current_time = time.time() self.logger.debug(f"키 떼짐: {key_str}") if key_str in self.pressed_keys: self.pressed_keys.remove(key_str) self.key_release_time[key_str] = current_time # ----- F7 키 처리 ----- def _handle_f7_press(self): self.send_request("ReplayJumpToNow&Channel=1") self.logger.info("ReplayJumpToNow 명령 전송") # ----- 조그 이동 처리 (F8, F9) ----- def _handle_jog_movement(self, key_name, current_time): if current_time - self.last_jog_time > 0.5: self.jog_rotation_count = 0 self.jog_direction = None if self.jog_direction != key_name: self.jog_rotation_count = 0 self.jog_direction = key_name sensitivity = self.jog_sensitivity_var.get() self.jog_rotation_count = min(self.jog_rotation_count + 1, len(self.jog_speed_levels) - 1) self.last_jog_time = current_time jog_speed = self.jog_speed_levels[self.jog_rotation_count] direction = -1 if key_name == 'f8' else 1 jog_speed *= sensitivity command = f"ReplayJumpFrames&Value={direction * jog_speed}&Channel=1" self.send_request(command) self.logger.debug(f"조그 이동: 방향={direction}, 속도={jog_speed}, 회전횟수={self.jog_rotation_count}, 민감도={sensitivity}") # ----- 오버레이 키 처리 ----- def _handle_overlay_key(self, key_name): current_time = time.time() if not self.first_key: self.first_key = key_name self.first_key_time = current_time self.overlay_key_timer = self.root.after(300, self._process_single_key, key_name) else: if current_time - self.first_key_time <= 0.3: if self.overlay_key_timer: self.root.after_cancel(self.overlay_key_timer) try: row = int(self.first_key[1:]) - 15 col = int(key_name[1:]) - 15 except ValueError: row, col = 1, 1 self._process_overlay_key(row, col) else: self._process_single_key(key_name) self.first_key = None self.first_key_time = None def _process_single_key(self, key_name): row = 1 try: col = int(key_name[1:]) - 15 except ValueError: col = 1 self._process_overlay_key(row, col) def _process_overlay_key(self, row, col): try: value = self.sheets[self.current_sheet][1].at[row, col] self._update_overlay_label(value) self._move_to_next_sheet() except Exception as e: self.logger.error(f"오버레이 처리 오류: {e}") def _update_overlay_label(self, value): if not self.selected_tags: self.overlay_label_text = value else: self.overlay_label_text += f" {value}" self.selected_tags.append(value) if self.overlay_label: self.overlay_label.config(text=self.overlay_label_text) def _move_to_next_sheet(self): self.current_sheet += 1 try: sheet_count = int(self.tag_sheet_count_var.get()) if self.current_sheet >= sheet_count: self._on_overlay_close() else: if self.notebook_clone: self.notebook_clone.select(self.current_sheet) except ValueError: self.logger.error("유효하지 않은 TAG 시트 수.") def _send_tag_to_vmix(self): if self.selected_tags: tag_text = ' '.join(self.selected_tags) command = f"ReplaySetSelectedEventText&Value={tag_text}" self.send_request(command) self.logger.info(f"vMix에 태그 전송: {tag_text}") self.selected_tags = [] else: self.logger.info("선택된 태그 없음.") # ----- 리플레이 명령어 업데이트 ----- def _update_replay_commands(self, *args): try: camera_count = int(self.camera_count_var.get()) except ValueError: camera_count = 4 self.replay_commands = { "f1": lambda: self.send_request("ReplayPlaySelectedEventToOutput&Channel=1"), "f2": lambda: self.send_request("ReplaySelectNextEvent&Channel=1"), "f3": lambda: self.send_request("ReplaySelectPreviousEvent&Channel=1"), "f4": lambda: self.send_request("ReplaySetSpeed&Value=0.5&Channel=1"), "f5": lambda: self.send_request("ReplayChangeSpeed&Value=-0.1&Channel=1"), "f6": lambda: self.send_request("ReplayChangeSpeed&Value=0.1&Channel=1"), "f7": self._handle_f7_press, "f8": lambda: self._handle_jog_movement("f8", time.time()), "f9": lambda: self._handle_jog_movement("f9", time.time()), "f10": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[3].get()}"), "f11": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[4].get()}"), "f12": self.toggle_overlay, "f13": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[0].get()}"), "f14": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[1].get()}"), "f15": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[2].get()}"), "f16": lambda: self.send_request("ReplayToggleSelectedEventCamera4"), "f17": lambda: self.send_request("ReplayMarkIn&Channel=1"), "f18": lambda: self.send_request("ReplayMarkOut&Channel=1"), "f19": lambda: self.send_request("ReplayToggleSelectedEventCamera1"), "f20": lambda: self.send_request("ReplayToggleSelectedEventCamera2"), "f21": lambda: self.send_request("ReplayToggleSelectedEventCamera3"), "f22": lambda: self.send_request("ReplayPlayNext&Channel=1"), "f23": lambda: self.send_request("ReplayStopEvents&Channel=1"), "f24": lambda: self.send_request("ReplayLiveToggle"), } self.double_tap_commands = { "f19": "ReplaySelectedEventSingleCameraOn&Value=1", "f20": "ReplaySelectedEventSingleCameraOn&Value=2", "f21": "ReplaySelectedEventSingleCameraOn&Value=3", "f16": "ReplaySelectedEventSingleCameraOn&Value=4", } self.logger.info("리플레이 명령어 업데이트 완료.") # ----- 오버레이 토글 ----- def toggle_overlay(self): if self.overlay_toggle: self._on_overlay_close() else: self._create_new_overlay_window() self.overlay_toggle = not self.overlay_toggle # ----- Request Handler 설정 ----- def _setup_request_handler(self): try: ip_address = self.ip_entry.get().strip() if not ip_address: ip_address = "127.0.0.1:8088" self.ip_entry.insert(0, ip_address) base_url = f"http://{ip_address}" self.request_handler = SyncRequestHandler(base_url) self.logger.info(f"Request handler 초기화: {base_url}") except Exception as e: self.logger.error(f"Request handler 설정 실패: {e}") self._show_error(f"vMix 연결 설정 실패: {e}") # ----- vMix 요청 전송 ----- def send_request(self, command): try: if not self.request_handler: self._setup_request_handler() if not self.vmix_connected: self.logger.warning("vMix 연결되지 않음. 재연결 시도 중...") self._check_vmix_connection() if not self.vmix_connected: self._show_error("vMix가 연결되어 있지 않습니다.") return self.request_handler.send_request(command) except Exception as e: self.logger.error(f"요청 전송 에러: {e}") self._show_error(f"명령 전송 실패: {e}") # ----- 에러 메시지 표시 ----- def _show_error(self, message): self.root.after(0, lambda: messagebox.showerror("오류", message)) # ----- IP 주소 체크 ----- def _check_ip(self, ip, label=None): def ping_host(): try: host, port = ip.split(':') with socket.create_connection((host, int(port)), timeout=2): if label: self.root.after(0, lambda: label.config(fg="green")) except: if label: self.root.after(0, lambda: label.config(fg="red")) Thread(target=ping_host, daemon=True).start() # ----- 태그 시트 활성화/비활성화 업데이트 ----- def update_sheet_activation(self, *args): if not hasattr(self, 'notebook'): return try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i in range(3): if i < count: icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.tab(i, state="normal", image=icon) else: self.notebook.tab(i, state="normal") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="normal") else: icon = self.locked_image if self.locked_image else None if icon: self.notebook.tab(i, state="disabled", image=icon) else: self.notebook.tab(i, state="disabled") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="disabled") # ----- 탭 이름 변경 (더블클릭) ----- def _rename_tab(self, event): index = self.notebook.index("@%d,%d" % (event.x, event.y)) if index != "end": tab_text = self.notebook.tab(index, "text") entry = tk.Entry(self.notebook, width=20, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) def set_name(event=None): new_name = entry.get() self.notebook.tab(index, text=new_name) entry.destroy() entry.bind("<Return>", set_name) entry.bind("<FocusOut>", set_name) entry.bind("<Button-1>", lambda e: e.widget.focus_set()) entry.place(x=event.x, y=event.y, anchor="center", width=200, height=30) entry.insert(0, tab_text) entry.select_range(0, tk.END) entry.focus_set() def check_focus(e): if e.widget != entry: set_name() self.root.bind("<Button-1>", check_focus, add="+") def remove_check_focus(event=None): self.root.unbind("<Button-1>", funcid) funcid = self.root.bind("<FocusOut>", remove_check_focus, add="+") # ----- 오버레이 창 생성 ----- def _create_new_overlay_window(self): self._update_data_from_entries() self.selected_tags = [] width = 1100 height = 300 self.overlay_window = tk.Toplevel(self.root) self.overlay_window.title("이벤트 태그 (TAG) 선택") self.overlay_window.attributes("-topmost", True) self.overlay_window.configure(bg=self.bg_color) if hasattr(self, 'icon_path') and os.path.exists(self.icon_path): self.overlay_window.iconbitmap(self.icon_path) monitors = screeninfo.get_monitors() try: monitor_index = max(0, min(int(self.overlay_monitor_var.get()) - 1, len(monitors) - 1)) except ValueError: monitor_index = 0 if monitor_index < 0 or monitor_index >= len(monitors): self.logger.error(f"잘못된 모니터 인덱스: {monitor_index}") self.overlay_window.destroy() return monitor = monitors[monitor_index] overlay_x = monitor.x + (monitor.width - width) // 2 overlay_y = monitor.y + monitor.height - height - 50 self.logger.info(f"오버레이 창 - 모니터 {monitor_index + 1}: (x={overlay_x}, y={overlay_y}), 크기=({width}x{height})") self.overlay_window.geometry(f"{width}x{height}+{overlay_x}+{overlay_y}") self.overlay_window.resizable(False, False) self.notebook_clone = ttk.Notebook(self.overlay_window) self.notebook_clone.pack(expand=True, fill="both", padx=10, pady=10) try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i, (frame, data) in enumerate(self.sheets[:count]): frame_clone = ttk.Frame(self.notebook_clone) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text"), image=icon, compound="left") else: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text")) for j in range(10): frame_clone.grid_columnconfigure(j, weight=1) frame_clone.grid_rowconfigure(j, weight=1) for row in range(1, 10): label_row = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_row.grid(row=row, column=0, sticky="nsew") label_col = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_col.grid(row=0, column=row, sticky="nsew") for row in range(1, 10): for col in range(1, 10): value = data.at[row, col] bg_color = self.entry_bg_color if (row + col) % 2 == 0 else self.bg_color tk.Label(frame_clone, text=value, font=('Arial', 10), bg=bg_color, fg=self.fg_color, borderwidth=1, relief="solid").grid( row=row, column=col, sticky="nsew", padx=1, pady=1) self.overlay_label = tk.Label(self.overlay_window, text=self.overlay_label_text, bg=self.bg_color, fg=self.fg_color, font=('Arial', 12)) self.overlay_label.pack(side="bottom", padx=10, pady=10, fill="x") self.overlay_window.protocol("WM_DELETE_WINDOW", self._on_overlay_close) self.overlay_window_open = True self.current_sheet = 0 # ----- 오버레이 창 닫기 처리 ----- def _on_overlay_close(self): self.overlay_window_open = False self.overlay_close_time = time.time() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.selected_tags: self._send_tag_to_vmix() self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.current_sheet = 0 self.root.after(100, self._process_buffered_keys) self.overlay_toggle = False def _process_buffered_keys(self): current_time = time.time() for key_name, press_time in self.key_press_buffer: if current_time - press_time < 0.5: self._simulate_key_press(key_name) self.key_press_buffer.clear() def _simulate_key_press(self, key_name): self.on_press(key_name) self.root.after(50, lambda: self.on_release(key_name)) # ----- 엔트리 위젯에서 데이터 업데이트 ----- def _update_data_from_entries(self): for entry in self.entries: try: row = int(entry.grid_info().get('row', 0)) col = int(entry.grid_info().get('column', 0)) for frame, data in self.sheets: if entry.master == frame: data.at[row, col] = entry.get() except Exception as e: self.logger.warning(f"데이터 업데이트 오류: {e}") # ----- 데이터 저장 (딕셔너리) ----- def _save_data(self): self._update_data_from_entries() sheet_data = {f"시트 {i+1}": data.to_dict(orient='split') for i, (frame, data) in enumerate(self.sheets)} sheet_names = {f"시트 {i+1}": self.notebook.tab(i, "text") for i in range(self.notebook.index("end"))} return {"sheets": sheet_data, "names": sheet_names} # ----- 파일로 저장 ----- def _save_to_file(self): data_dict = self._save_data() file_path = filedialog.asksaveasfilename(defaultextension=".PDB", filetypes=[("PDB files", "*.PDB")]) if file_path: if not file_path.lower().endswith('.pdb'): file_path += '.PDB' try: with open(file_path, 'w', encoding='utf-8') as file: json.dump(data_dict, file, ensure_ascii=False, indent=4) self.logger.info(f"데이터가 {file_path}에 저장됨.") except Exception as e: self.logger.error(f"파일 저장 에러: {e}") self._show_error(str(e)) # ----- 파일에서 로드 ----- def _load_from_file(self): file_path = filedialog.askopenfilename(filetypes=[("PDB files", "*.PDB")]) if file_path: try: with open(file_path, 'r', encoding='utf-8') as file: data_dict = json.load(file) sheet_data = data_dict.get("sheets", {}) sheet_names = data_dict.get("names", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: self.notebook.tab(i, text=sheet_names.get(sheet_name, sheet_name)) json_str = json.dumps(sheet_data[sheet_name]) loaded_data = pd.read_json(StringIO(json_str), orient='split') for row in range(1, 10): for col in range(1, 10): value = loaded_data.at[row, col] entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info(f"파일에서 데이터 로드 완료: {file_path}") except Exception as e: self.logger.error(f"파일 로드 에러: {e}") self._show_error(str(e)) else: self.logger.info("파일 로드 취소됨.") # ----- 모든 데이터 초기화 ----- def _clear_all_data(self): if messagebox.askokcancel("초기화 확인", "정말 모든 데이터를 초기화 하시겠습니까?"): for entry in self.entries: entry.config(state="normal") entry.delete(0, tk.END) for frame, data in self.sheets: data.loc[:] = "" self.logger.info("모든 데이터 초기화 완료.") # ----- 특정 좌표의 엔트리 반환 ----- def _get_entry(self, frame, row, col): for widget in frame.winfo_children(): if isinstance(widget, tk.Entry): info = widget.grid_info() if int(info.get('row', -1)) == row and int(info.get('column', -1)) == col: return widget return None # ----- vMix 연결 체크 (백그라운드 스레드에서) ----- def _check_vmix_connection(self): try: ip_address = self.ip_entry.get().strip() response = requests.get(f"http://{ip_address}/api/", timeout=0.5) response.raise_for_status() is_connected = True self.logger.info("vMix 연결됨") except requests.exceptions.RequestException as e: is_connected = False self.logger.error(f"vMix 연결 에러: {e}") if not self.request_handler: self._setup_request_handler() self.root.after(0, lambda: self._update_vmix_status_indicator(is_connected)) # ----- 주기적 vMix 연결 체크 (5초 간격) ----- def _start_vmix_check_loop(self): current_time = time.time() if current_time - self.last_vmix_check >= self.vmix_check_interval: self.last_vmix_check = current_time Thread(target=self._check_vmix_connection, daemon=True).start() self.root.after(5000, self._start_vmix_check_loop) # ----- vMix 상태 표시기 업데이트 ----- def _update_vmix_status_indicator(self, connected): color = "green" if connected else "red" if hasattr(self, 'vmix_status_indicator'): self.vmix_status_indicator.delete("status_oval") self.vmix_status_indicator.create_oval(2, 2, 13, 13, fill=color, tags="status_oval") self.vmix_connected = connected # ----- 설정 저장 ----- def _save_settings(self): settings = { 'ip': self.ip_entry.get(), 'camera_count': self.camera_count_var.get(), 'overlay_on_off': self.overlay_on_off_var.get(), 'overlay_monitor': self.overlay_monitor_var.get(), 'tag_sheet_count': self.tag_sheet_count_var.get(), 'jog_sensitivity': self.jog_sensitivity_var.get(), } for i, var in enumerate(self.end_time_vars): settings[f'end_time_{i+1}'] = var.get() sheet_data = {} for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" sheet_data[sheet_name] = {} for row in range(1, 10): for col in range(1, 10): entry = self._get_entry(frame, row, col) if entry: sheet_data[sheet_name][f"{row},{col}"] = entry.get() settings['sheet_data'] = sheet_data try: with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=4) self.logger.info("설정 저장 완료.") except Exception as e: self.logger.error(f"설정 저장 실패: {e}") self._show_error(f"설정 저장 실패: {e}") # ----- 설정 로드 ----- def _load_settings(self): try: with open('settings.json', 'r', encoding='utf-8') as f: settings = json.load(f) self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, settings.get('ip', '127.0.0.1:8088')) self._check_ip(self.ip_entry.get()) for i, var in enumerate(self.end_time_vars): var.set(settings.get(f'end_time_{i+1}', str((i+1)*10))) self.camera_count_var.set(settings.get('camera_count', '4')) self.overlay_on_off_var.set(settings.get('overlay_on_off', True)) self.overlay_monitor_var.set(settings.get('overlay_monitor', '1')) self.tag_sheet_count_var.set(settings.get('tag_sheet_count', '3')) self.jog_sensitivity_var.set(settings.get('jog_sensitivity', 5)) sheet_data = settings.get("sheet_data", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: for row in range(1, 10): for col in range(1, 10): value = sheet_data[sheet_name].get(f"{row},{col}", "") entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info("설정 로드 완료.") except FileNotFoundError: self.logger.warning("설정 파일 없음. 기본값 사용.") except json.JSONDecodeError as e: self.logger.error(f"설정 파일 JSON 파싱 에러: {e}") self._show_error(f"설정 파일 파싱 오류: {e}") # ----- 프로그램 종료 처리 ----- def _on_closing(self): if hasattr(self, 'keyboard_hook') and self.keyboard_hook: self.keyboard_hook.stop() self._save_settings() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.request_handler: self.request_handler.close() self.logger.info("프로그램 종료.") self.root.destroy() # ----- 메인 루프 실행 ----- def run(self): self.root.protocol("WM_DELETE_WINDOW", self._on_closing) self.root.mainloop() # ------------------------------------------------------------------------- # 누락된 _get_monitor_count 메서드 추가 (모니터 갯수를 반환) @functools.lru_cache(maxsize=None) def _get_monitor_count(self): """모니터 갯수 반환 (캐싱 사용)""" return len(screeninfo.get_monitors())
def main():
controller = ReplayController()
controller.run()
if name == "main":
main()
코드 전체 분석해서 기능 누락없이 전체적으로 수정 개선해줘 문제점들 해결하고 추가 제안 사항 있으면 만들어줘
전체 코드 복붙만 하면 되도록 전부 적어줘
아래는 기존 기능을 모두 유지하면서 몇 가지 문제점을 수정·개선한 최종 전체 코드입니다. 주요 수정 및 개선 사항은 다음과 같습니다:
아래의 코드를 그대로 복사하여 사용하시면 됩니다.
pythonimport ctypes import sys import tkinter as tk from tkinter import ttk, messagebox, filedialog import pandas as pd import json from io import StringIO import screeninfo import requests from threading import Thread, Lock from PIL import Image, ImageTk import os import time import logging from queue import Queue import socket import functools import math import win32con from ctypes import windll, CFUNCTYPE, POINTER, c_int, c_void_p, byref, c_long, cast from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPARAM, WPARAM, LONG # ============================================================================= # Logging configuration # ============================================================================= logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.getLogger('PIL').setLevel(logging.WARNING) # ============================================================================= # Windows low-level keyboard hooking structures and callback definition using ctypes # ============================================================================= class KBDLLHOOKSTRUCT(ctypes.Structure): _fields_ = [ ("vkCode", DWORD), ("scanCode", DWORD), ("flags", DWORD), ("time", DWORD), ("dwExtraInfo", POINTER(c_void_p)) ] LowLevelKeyboardProc = CFUNCTYPE(LONG, c_int, WPARAM, LPARAM) class KeyboardHook: """ Windows API low-level keyboard hooking class. Hooks function keys F1~F24 and calls controller's on_press/on_release events. """ def __init__(self, controller): self.controller = controller self.user32 = windll.user32 self.kernel32 = windll.kernel32 self.hooked = None self.hInstance = self.kernel32.GetModuleHandleW(None) self.function_key_vkcodes = set(range(0x70, 0x88)) # F1 ~ F24 (vkCode range) self.pointer = LowLevelKeyboardProc(self._hook_proc) self.is_running = False def _hook_proc(self, nCode, wParam, lParam): """ Hook callback function – processes key events and suppresses system propagation if needed. """ try: if nCode == win32con.HC_ACTION: kb = cast(lParam, POINTER(KBDLLHOOKSTRUCT)).contents vk_code = kb.vkCode if vk_code in self.function_key_vkcodes: key_str = f"f{vk_code - 0x6F}" if wParam in (win32con.WM_KEYDOWN, win32con.WM_SYSKEYDOWN): self.controller.on_press(key_str) elif wParam in (win32con.WM_KEYUP, win32con.WM_SYSKEYUP): self.controller.on_release(key_str) return 1 # Block further propagation except Exception as e: self.controller.logger.error(f"Keyboard hook error: {e}") return self.user32.CallNextHookEx(self.hooked, nCode, wParam, lParam) def start(self): """Starts the keyboard hook.""" if self.is_running: return False self.hooked = self.user32.SetWindowsHookExA( win32con.WH_KEYBOARD_LL, self.pointer, self.hInstance, 0 ) if not self.hooked: self.controller.logger.error("Failed to set keyboard hook") return False self.is_running = True self.controller.logger.info("Keyboard hook started") self.hook_thread = Thread(target=self._pump_messages, daemon=True) self.hook_thread.start() return True def _pump_messages(self): """Message pump to continuously process keyboard events.""" msg = MSG() while self.is_running: while self.user32.PeekMessageA(byref(msg), None, 0, 0, win32con.PM_REMOVE): self.user32.TranslateMessage(byref(msg)) self.user32.DispatchMessageA(byref(msg)) time.sleep(0.01) def stop(self): """Stops the keyboard hook.""" if self.is_running and self.hooked: self.user32.UnhookWindowsHookEx(self.hooked) self.is_running = False self.hooked = None self.controller.logger.info("Keyboard hook stopped") # ============================================================================= # vMix API synchronous request handler (thread pool, throttling, error handling) # ============================================================================= class SyncRequestHandler: def __init__(self, base_url, max_concurrent_requests=5, request_timeout=5): self.base_url = base_url self.queue = Queue() self.max_concurrent_requests = max_concurrent_requests self.workers = [] self.session = requests.Session() self.request_timeout = request_timeout self.lock = Lock() self.last_request_time = {} self.throttle_interval = 0.05 # 50ms throttling self._start_workers() def _start_workers(self): for _ in range(self.max_concurrent_requests): worker = Thread(target=self._worker_thread, daemon=True) worker.start() self.workers.append(worker) def _worker_thread(self): while True: command = self.queue.get() if command is None: break # Termination signal try: self._send_request(command) except Exception as e: logging.error(f"Worker thread error: {e}") finally: self.queue.task_done() def _send_request(self, command): url = f"{self.base_url}/api/?Function={command}" current_time = time.time() with self.lock: if command in self.last_request_time: if current_time - self.last_request_time[command] < self.throttle_interval: logging.debug(f"Throttling: {command}") return self.last_request_time[command] = current_time try: response = self.session.get(url, timeout=self.request_timeout) response.raise_for_status() logging.info(f"Request success: {url} - Status code: {response.status_code}") except requests.exceptions.Timeout: logging.error(f"Request timeout: {url}") except requests.exceptions.ConnectionError: logging.error(f"Connection error: {url}") except requests.exceptions.RequestException as e: logging.error(f"Request error: {url} - {e}") def send_request(self, command): self.queue.put(command) def close(self): for _ in self.workers: self.queue.put(None) for worker in self.workers: worker.join() self.session.close() # ============================================================================= # ReplayController class (integrates GUI, keyboard hook, and vMix communication) # ============================================================================= class ReplayController: def __init__(self): self.logger = logging.getLogger(__name__) self.root = tk.Tk() self.root.title("최반장 리플레이 컨트롤러 간소판 v1.22 - 개선됨") self._setup_icon() self._initialize_variables() self._setup_dark_theme() self._create_frames() # Create vMix status indicator in the 기본 설정 frame self.vmix_status_indicator = tk.Canvas( self.frames["기본 설정"], width=14, height=14, bg=self.frame_bg_color, highlightthickness=0, borderwidth=0 ) self._create_widgets() self._create_notebook() self._setup_keyboard_hook() self._setup_request_handler() self._start_vmix_check_loop() self._load_settings() self.root.geometry("870x800") self.root.minsize(870, 800) self.root.resizable(True, True) self.root.columnconfigure(0, weight=1) self.root.rowconfigure(4, weight=1) # ----- Icon setup ----- def _setup_icon(self): script_dir = os.path.dirname(os.path.abspath(__file__)) icon_path = os.path.join(script_dir, "chobanjangTV_icon.ico") if os.path.exists(icon_path): try: self.root.iconbitmap(icon_path) self.icon_path = icon_path self.logger.info("Icon set successfully") except Exception as e: self.logger.warning(f"Icon setup failed: {e}") else: self.logger.warning(f"Icon file not found: {icon_path}") # ----- Variable initialization ----- def _initialize_variables(self): self.jog_sensitivity_var = tk.DoubleVar(value=5) self.jog_rotation_count = 0 self.last_jog_time = 0 self.jog_direction = None self.jog_speed_levels = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] self.request_handler = None self.vmix_connected = False self.last_vmix_check = 0 self.vmix_check_interval = 5.0 self.overlay_window = None self.overlay_window_open = False self.key_press_buffer = [] self.selected_tags = [] self.current_sheet = 0 self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.overlay_toggle = False self.last_press_time = {} self.double_tap_commands = {} self.replay_commands = {} self.pressed_keys = set() self.key_press_time = {} self.key_release_time = {} self.first_key = None self.first_key_time = None self.overlay_key_timer = None self.notebook_clone = None self.overlay_label = None self.overlay_close_time = 0 self.end_time_vars = [tk.StringVar() for _ in range(5)] self.camera_count_var = tk.StringVar() self.overlay_on_off_var = tk.BooleanVar(value=True) self.overlay_monitor_var = tk.StringVar() self.tag_sheet_count_var = tk.StringVar() self.tag_sheet_count_var.trace_add("write", self.update_sheet_activation) self.sheets = [] self.entries = [] # ----- Dark theme setup ----- def _setup_dark_theme(self): self.bg_color = '#1e1e1e' self.fg_color = '#ffffff' self.accent_color = '#007acc' self.entry_bg_color = '#1a1a1a' self.button_bg_color = '#3c3c3c' self.frame_bg_color = '#252526' self.root.configure(bg=self.bg_color) self.style = ttk.Style() self.style.theme_create("darktheme", parent="alt", settings={ "TNotebook": {"configure": {"background": self.bg_color, "tabmargins": [2, 5, 2, 0]}}, "TNotebook.Tab": {"configure": {"padding": [20, 10], "background": self.bg_color, "foreground": self.fg_color}, "map": {"background": [("selected", self.accent_color)], "expand": [("selected", [1, 1, 1, 0])]}}, "TFrame": {"configure": {"background": self.frame_bg_color}}, "TLabel": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, "TEntry": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color, "insertcolor": self.fg_color}}, "TButton": {"configure": {"background": self.button_bg_color, "foreground": self.fg_color}}, "TCombobox": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color}}, "TCheckbutton": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, }) self.style.theme_use("darktheme") self.style.configure('TNotebook.Tab', padding=[20, 10], font=('Arial', 12)) self.style.map('TNotebook.Tab', expand=[('selected', [1, 1, 1, 0])], font=[('selected', ('Arial', 12, 'bold'))]) # ----- Icon loading (unlocked/locked) ----- def _load_icons(self): script_dir = os.path.dirname(os.path.abspath(__file__)) unlocked_path = os.path.join(script_dir, "unlocked.png") locked_path = os.path.join(script_dir, "locked.png") try: if os.path.exists(unlocked_path): self.unlocked_image = ImageTk.PhotoImage(Image.open(unlocked_path).resize((16, 16))) else: self.unlocked_image = None self.logger.warning(f"unlocked.png not found: {unlocked_path}") if os.path.exists(locked_path): self.locked_image = ImageTk.PhotoImage(Image.open(locked_path).resize((16, 16))) else: self.locked_image = None self.logger.warning(f"locked.png not found: {locked_path}") except Exception as e: self.logger.error(f"Failed to load icons: {e}") self.unlocked_image = None self.locked_image = None # ----- Frame creation ----- def _create_frames(self): self.frames = {} frame_texts = ["기본 설정", "조그 설정", "오버레이 설정", "TAG 시스템"] for i, text in enumerate(frame_texts): frame = ttk.Frame(self.root) frame.grid(row=i, column=0, sticky="nsew", padx=5, pady=5) frame.configure(style='TFrame') label = tk.Label(frame, text=text, font=('Arial', 12, 'bold'), bg=self.accent_color, fg=self.fg_color) label.pack(fill='x', pady=(0, 5)) self.frames[text] = frame # ----- Widget creation and placement ----- def _create_widgets(self): # 기본 설정 frame basic_frame = self.frames["기본 설정"] ip_frame = ttk.Frame(basic_frame) ip_frame.pack(fill='x', padx=10, pady=5) self.ip_entry = self._create_label_entry( ip_frame, "아이피", "vmix 리플레이가 실행되는 아이피와 포트 주소", "127.0.0.1:8088", self._check_ip ) # Place status indicator self.vmix_status_indicator.pack(side='left', padx=(5, 0), in_=ip_frame) self._update_vmix_status_indicator(False) # End time comboboxes end_time_defaults = [5, 10, 20, 30, 40] for i in range(5): self._create_label_combobox( basic_frame, f"끝 점 시간 설정 {i+1} (초)", f"끝 점(outpoint) 마크시 시간 설정 {i+1}", [str(j) for j in range(1, 61)], str(end_time_defaults[i]), self.end_time_vars[i] ) # Camera count combobox self._create_label_combobox( basic_frame, "카메라 수", "리플레이에 쓸 카메라 수 설정. 사용하지 않는 카메라는 실수 방지를 위해 비활성화됩니다.", [str(i) for i in range(1, 5)], "4", self.camera_count_var ) # 조그 설정 frame: jog sensitivity slider self._create_jog_sensitivity_slider(self.frames["조그 설정"]) # 오버레이 설정 overlay_frame = self.frames["오버레이 설정"] self._create_label_checkbox( overlay_frame, "오버레이 on/off", "리플레이 정보 오버레이를 활성화/비활성화 합니다.", self.overlay_on_off_var ) self.overlay_monitor_combobox = self._create_label_combobox( overlay_frame, "오버레이 모니터 번호", f"오버레이가 표시될 모니터 번호 (현재 모니터 갯수: {self._get_monitor_count()})", [str(i) for i in range(1, self._get_monitor_count()+1)], "1", self.overlay_monitor_var ) # TAG 시스템 설정 tag_frame = self.frames["TAG 시스템"] self.tag_sheet_count_combobox = self._create_label_combobox( tag_frame, "태그 시스템 사용 시트수", "사용할 태그 시스템 시트수를 조정합니다.", ["1", "2", "3"], "3", self.tag_sheet_count_var ) # Bottom button frame self._create_button_frame() # Update replay commands when variables change for var in self.end_time_vars: var.trace_add("write", self._update_replay_commands) self.camera_count_var.trace_add("write", self._update_replay_commands) self.jog_sensitivity_var.trace_add("write", self._update_replay_commands) # ----- TAG system notebook creation ----- def _create_notebook(self): self.notebook = ttk.Notebook(self.root) self.notebook.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) self._load_icons() for sheet_num in range(1, 4): frame = ttk.Frame(self.notebook) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.add(frame, text=f"시트 {sheet_num}", image=icon, compound="left") else: self.notebook.add(frame, text=f"시트 {sheet_num}") data = pd.DataFrame("", index=range(1, 10), columns=range(1, 10)) self.sheets.append((frame, data)) for frame, data in self.sheets: for i in range(10): frame.grid_columnconfigure(i, weight=1) frame.grid_rowconfigure(i, weight=1) for i in range(1, 10): self._create_cell(frame, 0, i, value=str(i), readonly=True, bold=True) self._create_cell(frame, i, 0, value=str(i), readonly=True, bold=True) for i in range(1, 10): for j in range(1, 10): self._create_cell(frame, i, j) self.notebook.bind("<Double-1>", self._rename_tab) self.update_sheet_activation() # ----- Bottom button frame creation ----- def _create_button_frame(self): button_frame = ttk.Frame(self.root) button_frame.grid(row=5, column=0, sticky="ew", padx=5, pady=5) dev_label = tk.Label(button_frame, text="제작 ", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) dev_label.grid(row=0, column=0, sticky="w", padx=(5,0)) pdb_label = tk.Label(button_frame, text="PDB", font=('Arial', 10), bg=self.bg_color, fg="#a80000") pdb_label.grid(row=0, column=1, sticky="w") omo_label = tk.Label(button_frame, text="오모 (240809 update)", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) omo_label.grid(row=0, column=2, sticky="w") save_button = tk.Button(button_frame, text="저장", command=self._save_to_file, width=10, bg=self.button_bg_color, fg=self.fg_color) save_button.grid(row=0, column=3, sticky="e", padx=5) load_button = tk.Button(button_frame, text="불러오기", command=self._load_from_file, width=10, bg=self.button_bg_color, fg=self.fg_color) load_button.grid(row=0, column=4, sticky="e", padx=5) clear_button = tk.Button(button_frame, text="값 초기화", command=self._clear_all_data, width=10, bg=self.button_bg_color, fg=self.fg_color) clear_button.grid(row=0, column=5, sticky="e", padx=5) # ----- Create label-entry widget ----- def _create_label_entry(self, frame, text, help_text, default, ip_check_func=None): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) entry = tk.Entry(container, width=18, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.pack(side='left') entry.insert(0, default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) if ip_check_func: status_icon = tk.Label(container, text="●", fg="red", bg=self.frame_bg_color) status_icon.pack(side='left', padx=(5, 0)) entry.bind("<FocusOut>", lambda event: ip_check_func(entry.get(), status_icon)) self.root.after(100, lambda: ip_check_func(entry.get(), status_icon)) return entry # ----- Create label-combobox widget ----- def _create_label_combobox(self, frame, text, help_text, values, default, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) combobox = ttk.Combobox(container, values=values, textvariable=var, width=18, style="TCombobox") combobox.pack(side='left') combobox.set(default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return combobox # ----- Create label-checkbox widget ----- def _create_label_checkbox(self, frame, text, help_text, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) checkbox = tk.Checkbutton(container, variable=var, bg=self.frame_bg_color, activebackground=self.frame_bg_color, selectcolor=self.accent_color, fg=self.fg_color) checkbox.pack(side='left') help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return checkbox # ----- Create jog sensitivity slider ----- def _create_jog_sensitivity_slider(self, frame): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text="조그 가감속 민감도", width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) def update_slider_value(value): rounded_value = round(float(value), 1) self.jog_sensitivity_var.set(rounded_value) value_label.config(text=f"{rounded_value:.1f}") slider = ttk.Scale(container, from_=1.0, to=10.0, orient="horizontal", variable=self.jog_sensitivity_var, length=200, command=update_slider_value) slider.pack(side='left') value_label = tk.Label(container, text=f"{self.jog_sensitivity_var.get():.1f}", width=5, bg=self.frame_bg_color, fg=self.fg_color) value_label.pack(side='left', padx=(5, 0)) help_label = tk.Label(container, text="조그 가감속 민감도를 조절합니다.", fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) # ----- Create TAG system cell ----- def _create_cell(self, parent, row, col, value="", readonly=False, bold=False): if row == 0 or col == 0: label = tk.Label(parent, text=value, width=10, relief="flat", borderwidth=1, bg=self.accent_color, fg=self.fg_color, font=('Arial', 10, 'bold')) label.grid(row=row, column=col, sticky="nsew") return label else: entry = tk.Entry(parent, width=10, justify='center', bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.grid(row=row, column=col, sticky="nsew") entry.insert(0, value) if readonly: entry.config(state='readonly') if bold: entry.config(font=('Arial', 10, 'bold')) self.entries.append(entry) return entry # ----- Keyboard hook setup ----- def _setup_keyboard_hook(self): self.keyboard_hook = KeyboardHook(self) if not self.keyboard_hook.start(): self.logger.error("Failed to set keyboard hook") else: self.logger.info("Keyboard hook set successfully") # ----- Key event handling ----- def on_press(self, key_str): current_time = time.time() self.logger.debug(f"Key pressed: {key_str}") self.key_press_buffer.append((key_str, current_time)) if len(self.key_press_buffer) > 10: self.key_press_buffer.pop(0) if key_str in self.pressed_keys: return self.pressed_keys.add(key_str) self.key_press_time[key_str] = current_time if self.overlay_window_open: if key_str == 'f12': self._on_overlay_close() self._send_tag_to_vmix() elif key_str in ['f16', 'f17', 'f18', 'f19', 'f20', 'f21', 'f22', 'f23', 'f24']: self._handle_overlay_key(key_str) else: if key_str in self.replay_commands: self.replay_commands[key_str]() if key_str in self.double_tap_commands: if current_time - self.last_press_time.get(key_str, 0) < 0.3: self.send_request(self.double_tap_commands[key_str]) self.last_press_time[key_str] = current_time def on_release(self, key_str): current_time = time.time() self.logger.debug(f"Key released: {key_str}") if key_str in self.pressed_keys: self.pressed_keys.remove(key_str) self.key_release_time[key_str] = current_time # ----- Overlay key handling ----- def _handle_overlay_key(self, key_name): current_time = time.time() if not self.first_key: self.first_key = key_name self.first_key_time = current_time self.overlay_key_timer = self.root.after(300, self._process_single_key, key_name) else: if current_time - self.first_key_time <= 0.3: if self.overlay_key_timer: self.root.after_cancel(self.overlay_key_timer) try: row = int(self.first_key[1:]) - 15 col = int(key_name[1:]) - 15 except ValueError: row, col = 1, 1 self._process_overlay_key(row, col) else: self._process_single_key(key_name) self.first_key = None self.first_key_time = None def _process_single_key(self, key_name): row = 1 try: col = int(key_name[1:]) - 15 except ValueError: col = 1 self._process_overlay_key(row, col) def _process_overlay_key(self, row, col): try: value = self.sheets[self.current_sheet][1].at[row, col] self._update_overlay_label(value) self._move_to_next_sheet() except Exception as e: self.logger.error(f"Overlay processing error: {e}") def _update_overlay_label(self, value): if not self.selected_tags: self.overlay_label_text = value else: self.overlay_label_text += f" {value}" self.selected_tags.append(value) if self.overlay_label: self.overlay_label.config(text=self.overlay_label_text) def _move_to_next_sheet(self): self.current_sheet += 1 try: sheet_count = int(self.tag_sheet_count_var.get()) if self.current_sheet >= sheet_count: self._on_overlay_close() else: if self.notebook_clone: self.notebook_clone.select(self.current_sheet) except ValueError: self.logger.error("Invalid TAG sheet count.") def _send_tag_to_vmix(self): if self.selected_tags: tag_text = ' '.join(self.selected_tags) command = f"ReplaySetSelectedEventText&Value={tag_text}" self.send_request(command) self.logger.info(f"Sent tag to vMix: {tag_text}") self.selected_tags = [] else: self.logger.info("No tags selected.") # ----- Replay commands update ----- def _update_replay_commands(self, *args): try: camera_count = int(self.camera_count_var.get()) except ValueError: camera_count = 4 self.replay_commands = { "f1": lambda: self.send_request("ReplayPlaySelectedEventToOutput&Channel=1"), "f2": lambda: self.send_request("ReplaySelectNextEvent&Channel=1"), "f3": lambda: self.send_request("ReplaySelectPreviousEvent&Channel=1"), "f4": lambda: self.send_request("ReplaySetSpeed&Value=0.5&Channel=1"), "f5": lambda: self.send_request("ReplayChangeSpeed&Value=-0.1&Channel=1"), "f6": lambda: self.send_request("ReplayChangeSpeed&Value=0.1&Channel=1"), "f7": lambda: self.send_request("ReplayJumpToNow&Channel=1"), "f8": lambda: self._handle_jog_movement("f8", time.time()), "f9": lambda: self._handle_jog_movement("f9", time.time()), "f10": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[3].get()}"), "f11": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[4].get()}"), "f12": self.toggle_overlay, "f13": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[0].get()}"), "f14": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[1].get()}"), "f15": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[2].get()}"), "f16": lambda: self.send_request("ReplayToggleSelectedEventCamera4"), "f17": lambda: self.send_request("ReplayMarkIn&Channel=1"), "f18": lambda: self.send_request("ReplayMarkOut&Channel=1"), "f19": lambda: self.send_request("ReplayToggleSelectedEventCamera1"), "f20": lambda: self.send_request("ReplayToggleSelectedEventCamera2"), "f21": lambda: self.send_request("ReplayToggleSelectedEventCamera3"), "f22": lambda: self.send_request("ReplayPlayNext&Channel=1"), "f23": lambda: self.send_request("ReplayStopEvents&Channel=1"), "f24": lambda: self.send_request("ReplayLiveToggle"), } self.double_tap_commands = { "f19": "ReplaySelectedEventSingleCameraOn&Value=1", "f20": "ReplaySelectedEventSingleCameraOn&Value=2", "f21": "ReplaySelectedEventSingleCameraOn&Value=3", "f16": "ReplaySelectedEventSingleCameraOn&Value=4", } self.logger.info("Replay commands updated.") def _handle_jog_movement(self, key_name, current_time): if current_time - self.last_jog_time > 0.5: self.jog_rotation_count = 0 self.jog_direction = None if self.jog_direction != key_name: self.jog_rotation_count = 0 self.jog_direction = key_name sensitivity = self.jog_sensitivity_var.get() self.jog_rotation_count = min(self.jog_rotation_count + 1, len(self.jog_speed_levels) - 1) self.last_jog_time = current_time jog_speed = self.jog_speed_levels[self.jog_rotation_count] direction = -1 if key_name == 'f8' else 1 jog_speed *= sensitivity command = f"ReplayJumpFrames&Value={direction * jog_speed}&Channel=1" self.send_request(command) self.logger.debug(f"Jog movement: direction={direction}, speed={jog_speed}, rotations={self.jog_rotation_count}, sensitivity={sensitivity}") # ----- Overlay toggle ----- def toggle_overlay(self): if self.overlay_toggle: self._on_overlay_close() else: self._create_new_overlay_window() self.overlay_toggle = not self.overlay_toggle # ----- Request handler setup ----- def _setup_request_handler(self): try: ip_address = self.ip_entry.get().strip() if not ip_address: ip_address = "127.0.0.1:8088" self.ip_entry.insert(0, ip_address) base_url = f"http://{ip_address}" self.request_handler = SyncRequestHandler(base_url) self.logger.info(f"Request handler initialized: {base_url}") except Exception as e: self.logger.error(f"Request handler setup failed: {e}") self._show_error(f"vMix 연결 설정 실패: {e}") # ----- Send vMix request ----- def send_request(self, command): try: if not self.request_handler: self._setup_request_handler() if not self.vmix_connected: self.logger.warning("vMix not connected. Attempting reconnection...") self._check_vmix_connection() if not self.vmix_connected: self._show_error("vMix가 연결되어 있지 않습니다.") return self.request_handler.send_request(command) except Exception as e: self.logger.error(f"Error sending request: {e}") self._show_error(f"명령 전송 실패: {e}") # ----- Display error message ----- def _show_error(self, message): self.root.after(0, lambda: messagebox.showerror("오류", message)) # ----- Check IP address ----- def _check_ip(self, ip, label=None): def ping_host(): try: host, port = ip.split(':') with socket.create_connection((host, int(port)), timeout=2): if label: self.root.after(0, lambda: label.config(fg="green")) except: if label: self.root.after(0, lambda: label.config(fg="red")) Thread(target=ping_host, daemon=True).start() # ----- Update TAG sheet activation ----- def update_sheet_activation(self, *args): if not hasattr(self, 'notebook'): return try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i in range(3): if i < count: icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.tab(i, state="normal", image=icon) else: self.notebook.tab(i, state="normal") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="normal") else: icon = self.locked_image if self.locked_image else None if icon: self.notebook.tab(i, state="disabled", image=icon) else: self.notebook.tab(i, state="disabled") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="disabled") # ----- Rename tab on double-click ----- def _rename_tab(self, event): index = self.notebook.index("@%d,%d" % (event.x, event.y)) if index != "end": tab_text = self.notebook.tab(index, "text") entry = tk.Entry(self.notebook, width=20, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) def set_name(event=None): new_name = entry.get() self.notebook.tab(index, text=new_name) entry.destroy() entry.bind("<Return>", set_name) entry.bind("<FocusOut>", set_name) entry.bind("<Button-1>", lambda e: e.widget.focus_set()) entry.place(x=event.x, y=event.y, anchor="center", width=200, height=30) entry.insert(0, tab_text) entry.select_range(0, tk.END) entry.focus_set() def check_focus(e): if e.widget != entry: set_name() self.root.bind("<Button-1>", check_focus, add="+") def remove_check_focus(event=None): self.root.unbind("<Button-1>", funcid) funcid = self.root.bind("<FocusOut>", remove_check_focus, add="+") # ----- Create new overlay window ----- def _create_new_overlay_window(self): self._update_data_from_entries() self.selected_tags = [] width = 1100 height = 300 self.overlay_window = tk.Toplevel(self.root) self.overlay_window.title("이벤트 태그 (TAG) 선택") self.overlay_window.attributes("-topmost", True) self.overlay_window.configure(bg=self.bg_color) if hasattr(self, 'icon_path') and os.path.exists(self.icon_path): self.overlay_window.iconbitmap(self.icon_path) monitors = screeninfo.get_monitors() try: monitor_index = max(0, min(int(self.overlay_monitor_var.get()) - 1, len(monitors) - 1)) except ValueError: monitor_index = 0 if monitor_index < 0 or monitor_index >= len(monitors): self.logger.error(f"Invalid monitor index: {monitor_index}") self.overlay_window.destroy() return monitor = monitors[monitor_index] overlay_x = monitor.x + (monitor.width - width) // 2 overlay_y = monitor.y + monitor.height - height - 50 self.logger.info(f"Overlay window - Monitor {monitor_index + 1}: (x={overlay_x}, y={overlay_y}), size=({width}x{height})") self.overlay_window.geometry(f"{width}x{height}+{overlay_x}+{overlay_y}") self.overlay_window.resizable(False, False) self.notebook_clone = ttk.Notebook(self.overlay_window) self.notebook_clone.pack(expand=True, fill="both", padx=10, pady=10) try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i, (frame, data) in enumerate(self.sheets[:count]): frame_clone = ttk.Frame(self.notebook_clone) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text"), image=icon, compound="left") else: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text")) for j in range(10): frame_clone.grid_columnconfigure(j, weight=1) frame_clone.grid_rowconfigure(j, weight=1) for row in range(1, 10): label_row = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_row.grid(row=row, column=0, sticky="nsew") label_col = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_col.grid(row=0, column=row, sticky="nsew") for row in range(1, 10): for col in range(1, 10): value = data.at[row, col] bg_color = self.entry_bg_color if (row + col) % 2 == 0 else self.bg_color tk.Label(frame_clone, text=value, font=('Arial', 10), bg=bg_color, fg=self.fg_color, borderwidth=1, relief="solid").grid( row=row, column=col, sticky="nsew", padx=1, pady=1) self.overlay_label = tk.Label(self.overlay_window, text=self.overlay_label_text, bg=self.bg_color, fg=self.fg_color, font=('Arial', 12)) self.overlay_label.pack(side="bottom", padx=10, pady=10, fill="x") self.overlay_window.protocol("WM_DELETE_WINDOW", self._on_overlay_close) self.overlay_window_open = True self.current_sheet = 0 # ----- Overlay window close handling ----- def _on_overlay_close(self): self.overlay_window_open = False self.overlay_close_time = time.time() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.selected_tags: self._send_tag_to_vmix() self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.current_sheet = 0 self.root.after(100, self._process_buffered_keys) self.overlay_toggle = False def _process_buffered_keys(self): current_time = time.time() for key_name, press_time in self.key_press_buffer: if current_time - press_time < 0.5: self._simulate_key_press(key_name) self.key_press_buffer.clear() def _simulate_key_press(self, key_name): self.on_press(key_name) self.root.after(50, lambda: self.on_release(key_name)) # ----- Update data from entry widgets ----- def _update_data_from_entries(self): for entry in self.entries: try: row = int(entry.grid_info().get('row', 0)) col = int(entry.grid_info().get('column', 0)) for frame, data in self.sheets: if entry.master == frame: data.at[row, col] = entry.get() except Exception as e: self.logger.warning(f"Data update error: {e}") # ----- Save data (as dictionary) ----- def _save_data(self): self._update_data_from_entries() sheet_data = {f"시트 {i+1}": data.to_dict(orient='split') for i, (frame, data) in enumerate(self.sheets)} sheet_names = {f"시트 {i+1}": self.notebook.tab(i, "text") for i in range(self.notebook.index("end"))} return {"sheets": sheet_data, "names": sheet_names} # ----- Save to file ----- def _save_to_file(self): data_dict = self._save_data() file_path = filedialog.asksaveasfilename(defaultextension=".PDB", filetypes=[("PDB files", "*.PDB")]) if file_path: if not file_path.lower().endswith('.pdb'): file_path += '.PDB' try: with open(file_path, 'w', encoding='utf-8') as file: json.dump(data_dict, file, ensure_ascii=False, indent=4) self.logger.info(f"Data saved to {file_path}.") except Exception as e: self.logger.error(f"File save error: {e}") self._show_error(str(e)) # ----- Load from file ----- def _load_from_file(self): file_path = filedialog.askopenfilename(filetypes=[("PDB files", "*.PDB")]) if file_path: try: with open(file_path, 'r', encoding='utf-8') as file: data_dict = json.load(file) sheet_data = data_dict.get("sheets", {}) sheet_names = data_dict.get("names", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: self.notebook.tab(i, text=sheet_names.get(sheet_name, sheet_name)) json_str = json.dumps(sheet_data[sheet_name]) loaded_data = pd.read_json(StringIO(json_str), orient='split') for row in range(1, 10): for col in range(1, 10): value = loaded_data.at[row, col] entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info(f"Data loaded from file: {file_path}") except Exception as e: self.logger.error(f"File load error: {e}") self._show_error(str(e)) else: self.logger.info("File load canceled.") # ----- Clear all data ----- def _clear_all_data(self): if messagebox.askokcancel("초기화 확인", "정말 모든 데이터를 초기화 하시겠습니까?"): for entry in self.entries: entry.config(state="normal") entry.delete(0, tk.END) for frame, data in self.sheets: data.loc[:] = "" self.logger.info("All data cleared.") # ----- Get entry widget at specific grid coordinates ----- def _get_entry(self, frame, row, col): for widget in frame.winfo_children(): if isinstance(widget, tk.Entry): info = widget.grid_info() if int(info.get('row', -1)) == row and int(info.get('column', -1)) == col: return widget return None # ----- Check vMix connection (background thread) ----- def _check_vmix_connection(self): try: ip_address = self.ip_entry.get().strip() response = requests.get(f"http://{ip_address}/api/", timeout=0.5) response.raise_for_status() is_connected = True self.logger.info("vMix connected") except requests.exceptions.RequestException as e: is_connected = False self.logger.error(f"vMix connection error: {e}") if not self.request_handler: self._setup_request_handler() self.root.after(0, lambda: self._update_vmix_status_indicator(is_connected)) # ----- Periodic vMix connection check (every 5 seconds) ----- def _start_vmix_check_loop(self): current_time = time.time() if current_time - self.last_vmix_check >= self.vmix_check_interval: self.last_vmix_check = current_time Thread(target=self._check_vmix_connection, daemon=True).start() self.root.after(5000, self._start_vmix_check_loop) # ----- Update vMix status indicator ----- def _update_vmix_status_indicator(self, connected): color = "green" if connected else "red" if hasattr(self, 'vmix_status_indicator'): self.vmix_status_indicator.delete("status_oval") self.vmix_status_indicator.create_oval(2, 2, 13, 13, fill=color, tags="status_oval") self.vmix_connected = connected # ----- Save settings ----- def _save_settings(self): settings = { 'ip': self.ip_entry.get(), 'camera_count': self.camera_count_var.get(), 'overlay_on_off': self.overlay_on_off_var.get(), 'overlay_monitor': self.overlay_monitor_var.get(), 'tag_sheet_count': self.tag_sheet_count_var.get(), 'jog_sensitivity': self.jog_sensitivity_var.get(), } for i, var in enumerate(self.end_time_vars): settings[f'end_time_{i+1}'] = var.get() sheet_data = {} for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" sheet_data[sheet_name] = {} for row in range(1, 10): for col in range(1, 10): entry = self._get_entry(frame, row, col) if entry: sheet_data[sheet_name][f"{row},{col}"] = entry.get() settings['sheet_data'] = sheet_data try: with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=4) self.logger.info("Settings saved successfully.") except Exception as e: self.logger.error(f"Settings save failed: {e}") self._show_error(f"설정 저장 실패: {e}") # ----- Load settings ----- def _load_settings(self): try: with open('settings.json', 'r', encoding='utf-8') as f: settings = json.load(f) self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, settings.get('ip', '127.0.0.1:8088')) self._check_ip(self.ip_entry.get()) for i, var in enumerate(self.end_time_vars): var.set(settings.get(f'end_time_{i+1}', str((i+1)*10))) self.camera_count_var.set(settings.get('camera_count', '4')) self.overlay_on_off_var.set(settings.get('overlay_on_off', True)) self.overlay_monitor_var.set(settings.get('overlay_monitor', '1')) self.tag_sheet_count_var.set(settings.get('tag_sheet_count', '3')) self.jog_sensitivity_var.set(settings.get('jog_sensitivity', 5)) sheet_data = settings.get("sheet_data", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: for row in range(1, 10): for col in range(1, 10): value = sheet_data[sheet_name].get(f"{row},{col}", "") entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info("Settings loaded successfully.") except FileNotFoundError: self.logger.warning("Settings file not found. Using defaults.") except json.JSONDecodeError as e: self.logger.error(f"Settings JSON parse error: {e}") self._show_error(f"설정 파일 파싱 오류: {e}") # ----- Program exit handling ----- def _on_closing(self): if hasattr(self, 'keyboard_hook') and self.keyboard_hook: self.keyboard_hook.stop() self._save_settings() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.request_handler: self.request_handler.close() self.logger.info("Program exiting.") self.root.destroy() # ----- Run main loop ----- def run(self): self.root.protocol("WM_DELETE_WINDOW", self._on_closing) self.root.mainloop() # ------------------------------------------------------------------------- # Additional _get_monitor_count method (returns number of monitors) @functools.lru_cache(maxsize=None) def _get_monitor_count(self): """Return the number of monitors (with caching).""" return len(screeninfo.get_monitors()) # ============================================================================= # Main function # ============================================================================= def main(): controller = ReplayController() controller.run() if __name__ == "__main__": main()
이 코드에는 기존 기능(키보드 후킹, vMix API 요청, GUI 구성, 오버레이 창, TAG 시스템 등)을 모두 유지하면서 중복 호출 문제와 일부 예외 처리 문제를 개선하였습니다. 추가적으로 설정 저장 위치나 코드 모듈화를 고려해보시면 더욱 좋을 것입니다.