윈도우 키보드 후킹

생성일: 2025년 2월 4일

답변에 사용된 모델 O3 Mini High by Chat01

질문

import ctypes
import sys
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import pandas as pd
import json
from io import StringIO
import screeninfo
import requests
from threading import Thread, Lock
from PIL import Image, ImageTk
import os
import time
import logging
from queue import Queue
import socket
import functools
import math
import win32con
from ctypes import windll, CFUNCTYPE, POINTER, c_int, c_void_p, byref, c_long, cast
from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPARAM, WPARAM, LONG

=============================================================================

로깅 설정

=============================================================================

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('PIL').setLevel(logging.WARNING)

=============================================================================

ctypes를 이용한 Windows 저수준 키보드 후킹 구조체/콜백 함수 정의

=============================================================================

class KBDLLHOOKSTRUCT(ctypes.Structure):
fields = [
("vkCode", DWORD),
("scanCode", DWORD),
("flags", DWORD),
("time", DWORD),
("dwExtraInfo", POINTER(c_void_p))
]

LowLevelKeyboardProc = CFUNCTYPE(LONG, c_int, WPARAM, LPARAM)

class KeyboardHook:
"""
Windows API 저수준 키보드 후킹 클래스
- F1~F24 키에 대해 후킹하여 controller의 on_press/on_release 이벤트 호출
"""
def init(self, controller):
self.controller = controller
self.user32 = windll.user32
self.kernel32 = windll.kernel32
self.hooked = None
self.hInstance = self.kernel32.GetModuleHandleW(None)
self.function_key_vkcodes = set(range(0x70, 0x88)) # F1 ~ F24 (vkCode 범위)
self.pointer = LowLevelKeyboardProc(self._hook_proc)
self.is_running = False

text
def _hook_proc(self, nCode, wParam, lParam): """ 후킹 콜백 함수 – 키 이벤트 처리 및 시스템 전달 억제 """ try: if nCode == win32con.HC_ACTION: kb = cast(lParam, POINTER(KBDLLHOOKSTRUCT)).contents vk_code = kb.vkCode if vk_code in self.function_key_vkcodes: key_str = f"f{vk_code - 0x6F}" if wParam in (win32con.WM_KEYDOWN, win32con.WM_SYSKEYDOWN): self.controller.on_press(key_str) elif wParam in (win32con.WM_KEYUP, win32con.WM_SYSKEYUP): self.controller.on_release(key_str) return 1 # 이벤트 전달 차단 except Exception as e: self.controller.logger.error(f"키보드 후킹 오류: {e}") return self.user32.CallNextHookEx(self.hooked, nCode, wParam, lParam) def start(self): """후킹 시작""" if self.is_running: return False self.hooked = self.user32.SetWindowsHookExA( win32con.WH_KEYBOARD_LL, self.pointer, self.hInstance, 0 ) if not self.hooked: self.controller.logger.error("키보드 후킹 설정 실패") return False self.is_running = True self.controller.logger.info("키보드 후킹 시작됨") self.hook_thread = Thread(target=self._pump_messages, daemon=True) self.hook_thread.start() return True def _pump_messages(self): """메시지 펌프로 키보드 이벤트 지속 처리""" msg = MSG() while self.is_running: while self.user32.PeekMessageA(byref(msg), None, 0, 0, win32con.PM_REMOVE): self.user32.TranslateMessage(byref(msg)) self.user32.DispatchMessageA(byref(msg)) time.sleep(0.01) def stop(self): """후킹 중지""" if self.is_running and self.hooked: self.user32.UnhookWindowsHookEx(self.hooked) self.is_running = False self.hooked = None self.controller.logger.info("키보드 후킹 중지됨")

=============================================================================

vMix API 동기 요청 핸들러 (쓰레드 풀, 쓰로틀링, 오류 처리)

=============================================================================

class SyncRequestHandler:
def init(self, base_url, max_concurrent_requests=5, request_timeout=5):
self.base_url = base_url
self.queue = Queue()
self.max_concurrent_requests = max_concurrent_requests
self.workers = []
self.session = requests.Session()
self.request_timeout = request_timeout
self.lock = Lock()
self.last_request_time = {}
self.throttle_interval = 0.05 # 50ms 쓰로틀링
self._start_workers()

text
def _start_workers(self): for _ in range(self.max_concurrent_requests): worker = Thread(target=self._worker_thread, daemon=True) worker.start() self.workers.append(worker) def _worker_thread(self): while True: command = self.queue.get() if command is None: break # 종료 신호 try: self._send_request(command) except Exception as e: logging.error(f"워커 쓰레드 에러: {e}") finally: self.queue.task_done() def _send_request(self, command): url = f"{self.base_url}/api/?Function={command}" current_time = time.time() with self.lock: if command in self.last_request_time: if current_time - self.last_request_time[command] < self.throttle_interval: logging.debug(f"쓰로틀링: {command}") return self.last_request_time[command] = current_time try: response = self.session.get(url, timeout=self.request_timeout) response.raise_for_status() logging.info(f"요청 성공: {url} - 상태 코드: {response.status_code}") except requests.exceptions.Timeout: logging.error(f"요청 타임아웃: {url}") except requests.exceptions.ConnectionError: logging.error(f"연결 에러: {url}") except requests.exceptions.RequestException as e: logging.error(f"요청 에러: {url} - {e}") def send_request(self, command): self.queue.put(command) def close(self): for _ in self.workers: self.queue.put(None) for worker in self.workers: worker.join() self.session.close()

=============================================================================

ReplayController 클래스 (GUI, 키보드 후킹, vMix 통신 통합)

=============================================================================

class ReplayController:
def init(self):
self.logger = logging.getLogger(name)
self.root = tk.Tk()
self.root.title("최반장 리플레이 컨트롤러 간소판 v1.22 - 안정화")
self._setup_icon()
self._initialize_variables()
self._setup_dark_theme()
self._create_frames()
# 기본 설정 프레임에 vMix 상태 표시기 생성
self.vmix_status_indicator = tk.Canvas(
self.frames["기본 설정"],
width=14,
height=14,
bg=self.frame_bg_color,
highlightthickness=0,
borderwidth=0
)
self._create_widgets()
self._create_notebook()
self._setup_keyboard_hook()
self._setup_request_handler()
self._start_vmix_check_loop()
self._load_settings()
self.root.geometry("870x800")
self.root.minsize(870, 800)
self.root.resizable(True, True)
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(4, weight=1)

text
# ----- 아이콘 설정 ----- def _setup_icon(self): script_dir = os.path.dirname(os.path.abspath(__file__)) icon_path = os.path.join(script_dir, "chobanjangTV_icon.ico") if os.path.exists(icon_path): try: self.root.iconbitmap(icon_path) self.icon_path = icon_path self.logger.info("아이콘 설정 성공") except Exception as e: self.logger.warning(f"아이콘 설정 실패: {e}") else: self.logger.warning(f"아이콘 파일({icon_path})을 찾을 수 없습니다.") # ----- 변수 초기화 ----- def _initialize_variables(self): self.jog_sensitivity_var = tk.DoubleVar(value=5) self.jog_rotation_count = 0 self.last_jog_time = 0 self.jog_direction = None self.jog_speed_levels = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] self.request_handler = None self.vmix_connected = False self.last_vmix_check = 0 self.vmix_check_interval = 5.0 self.overlay_window = None self.overlay_window_open = False self.key_press_buffer = [] self.selected_tags = [] self.current_sheet = 0 self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.overlay_toggle = False self.last_press_time = {} self.double_tap_commands = {} self.replay_commands = {} self.pressed_keys = set() self.key_press_time = {} self.key_release_time = {} self.first_key = None self.first_key_time = None self.overlay_key_timer = None self.notebook_clone = None self.overlay_label = None self.overlay_close_time = 0 self.end_time_vars = [tk.StringVar() for _ in range(5)] self.camera_count_var = tk.StringVar() self.overlay_on_off_var = tk.BooleanVar(value=True) self.overlay_monitor_var = tk.StringVar() self.tag_sheet_count_var = tk.StringVar() self.tag_sheet_count_var.trace_add("write", self.update_sheet_activation) self.sheets = [] self.entries = [] # ----- 다크 테마 설정 ----- def _setup_dark_theme(self): self.bg_color = '#1e1e1e' self.fg_color = '#ffffff' self.accent_color = '#007acc' self.entry_bg_color = '#1a1a1a' self.button_bg_color = '#3c3c3c' self.frame_bg_color = '#252526' self.root.configure(bg=self.bg_color) self.style = ttk.Style() self.style.theme_create("darktheme", parent="alt", settings={ "TNotebook": {"configure": {"background": self.bg_color, "tabmargins": [2, 5, 2, 0]}}, "TNotebook.Tab": {"configure": {"padding": [20, 10], "background": self.bg_color, "foreground": self.fg_color}, "map": {"background": [("selected", self.accent_color)], "expand": [("selected", [1, 1, 1, 0])]}}, "TFrame": {"configure": {"background": self.frame_bg_color}}, "TLabel": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, "TEntry": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color, "insertcolor": self.fg_color}}, "TButton": {"configure": {"background": self.button_bg_color, "foreground": self.fg_color}}, "TCombobox": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color}}, "TCheckbutton": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, }) self.style.theme_use("darktheme") self.style.configure('TNotebook.Tab', padding=[20, 10], font=('Arial', 12)) self.style.map('TNotebook.Tab', expand=[('selected', [1, 1, 1, 0])], font=[('selected', ('Arial', 12, 'bold'))]) # ----- 아이콘 로드 (잠금/해제) ----- def _load_icons(self): script_dir = os.path.dirname(os.path.abspath(__file__)) unlocked_path = os.path.join(script_dir, "unlocked.png") locked_path = os.path.join(script_dir, "locked.png") try: if os.path.exists(unlocked_path): self.unlocked_image = ImageTk.PhotoImage(Image.open(unlocked_path).resize((16, 16))) else: self.unlocked_image = None self.logger.warning(f"unlocked.png 파일을 찾을 수 없습니다: {unlocked_path}") if os.path.exists(locked_path): self.locked_image = ImageTk.PhotoImage(Image.open(locked_path).resize((16, 16))) else: self.locked_image = None self.logger.warning(f"locked.png 파일을 찾을 수 없습니다: {locked_path}") except Exception as e: self.logger.error(f"아이콘 로드 실패: {e}") self.unlocked_image = None self.locked_image = None # ----- 프레임 생성 ----- def _create_frames(self): self.frames = {} frame_texts = ["기본 설정", "조그 설정", "오버레이 설정", "TAG 시스템"] for i, text in enumerate(frame_texts): frame = ttk.Frame(self.root) frame.grid(row=i, column=0, sticky="nsew", padx=5, pady=5) frame.configure(style='TFrame') label = tk.Label(frame, text=text, font=('Arial', 12, 'bold'), bg=self.accent_color, fg=self.fg_color) label.pack(fill='x', pady=(0, 5)) self.frames[text] = frame # ----- 위젯 생성 및 배치 ----- def _create_widgets(self): # 기본 설정 프레임 basic_frame = self.frames["기본 설정"] ip_frame = ttk.Frame(basic_frame) ip_frame.pack(fill='x', padx=10, pady=5) self.ip_entry = self._create_label_entry( ip_frame, "아이피", "vmix 리플레이가 실행되는 아이피와 포트 주소", "127.0.0.1:8088", self._check_ip ) # 상태 표시기 배치 self.vmix_status_indicator.pack(side='left', padx=(5, 0), in_=ip_frame) self._update_vmix_status_indicator(False) # 끝 점 시간 설정 콤보박스 end_time_defaults = [5, 10, 20, 30, 40] for i in range(5): self._create_label_combobox( basic_frame, f"끝 점 시간 설정 {i+1} (초)", f"끝 점(outpoint) 마크시 시간 설정 {i+1}", [str(j) for j in range(1, 61)], str(end_time_defaults[i]), self.end_time_vars[i] ) # 카메라 수 설정 콤보박스 self._create_label_combobox( basic_frame, "카메라 수", "리플레이에 쓸 카메라 수 설정. 사용하지 않는 카메라는 실수 방지를 위해 비활성화됩니다.", [str(i) for i in range(1, 5)], "4", self.camera_count_var ) # 조그 설정 프레임에 민감도 슬라이더 self._create_jog_sensitivity_slider(self.frames["조그 설정"]) # 오버레이 설정 overlay_frame = self.frames["오버레이 설정"] self._create_label_checkbox( overlay_frame, "오버레이 on/off", "리플레이 정보 오버레이를 활성화/비활성화 합니다.", self.overlay_on_off_var ) self.overlay_monitor_combobox = self._create_label_combobox( overlay_frame, "오버레이 모니터 번호", f"오버레이가 표시될 모니터 번호 (현재 모니터 갯수: {self._get_monitor_count()})", [str(i) for i in range(1, self._get_monitor_count()+1)], "1", self.overlay_monitor_var ) # TAG 시스템 설정 tag_frame = self.frames["TAG 시스템"] self.tag_sheet_count_combobox = self._create_label_combobox( tag_frame, "태그 시스템 사용 시트수", "사용할 태그 시스템 시트수를 조정합니다.", ["1", "2", "3"], "3", self.tag_sheet_count_var ) # 하단 버튼 프레임 생성 self._create_button_frame() # 변수 변경 시 리플레이 명령어 업데이트 for var in self.end_time_vars: var.trace_add("write", self._update_replay_commands) self.camera_count_var.trace_add("write", self._update_replay_commands) self.jog_sensitivity_var.trace_add("write", self._update_replay_commands) # ----- 태그 시스템 노트북 생성 ----- def _create_notebook(self): self.notebook = ttk.Notebook(self.root) self.notebook.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) self._load_icons() for sheet_num in range(1, 4): frame = ttk.Frame(self.notebook) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.add(frame, text=f"시트 {sheet_num}", image=icon, compound="left") else: self.notebook.add(frame, text=f"시트 {sheet_num}") data = pd.DataFrame("", index=range(1, 10), columns=range(1, 10)) self.sheets.append((frame, data)) for frame, data in self.sheets: for i in range(10): frame.grid_columnconfigure(i, weight=1) frame.grid_rowconfigure(i, weight=1) for i in range(1, 10): self._create_cell(frame, 0, i, value=str(i), readonly=True, bold=True) self._create_cell(frame, i, 0, value=str(i), readonly=True, bold=True) for i in range(1, 10): for j in range(1, 10): self._create_cell(frame, i, j) self.notebook.bind("<Double-1>", self._rename_tab) self.update_sheet_activation() # ----- 하단 버튼 프레임 생성 ----- def _create_button_frame(self): button_frame = ttk.Frame(self.root) button_frame.grid(row=5, column=0, sticky="ew", padx=5, pady=5) dev_label = tk.Label(button_frame, text="제작 ", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) dev_label.grid(row=0, column=0, sticky="w", padx=(5,0)) pdb_label = tk.Label(button_frame, text="PDB", font=('Arial', 10), bg=self.bg_color, fg="#a80000") pdb_label.grid(row=0, column=1, sticky="w") omo_label = tk.Label(button_frame, text="오모 (240809 update)", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) omo_label.grid(row=0, column=2, sticky="w") save_button = tk.Button(button_frame, text="저장", command=self._save_to_file, width=10, bg=self.button_bg_color, fg=self.fg_color) save_button.grid(row=0, column=3, sticky="e", padx=5) load_button = tk.Button(button_frame, text="불러오기", command=self._load_from_file, width=10, bg=self.button_bg_color, fg=self.fg_color) load_button.grid(row=0, column=4, sticky="e", padx=5) clear_button = tk.Button(button_frame, text="값 초기화", command=self._clear_all_data, width=10, bg=self.button_bg_color, fg=self.fg_color) clear_button.grid(row=0, column=5, sticky="e", padx=5) # ----- 레이블-엔트리 위젯 생성 ----- def _create_label_entry(self, frame, text, help_text, default, ip_check_func=None): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) entry = tk.Entry(container, width=18, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.pack(side='left') entry.insert(0, default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) if ip_check_func: status_icon = tk.Label(container, text="●", fg="red", bg=self.frame_bg_color) status_icon.pack(side='left', padx=(5, 0)) entry.bind("<FocusOut>", lambda event: ip_check_func(entry.get(), status_icon)) self.root.after(100, lambda: ip_check_func(entry.get(), status_icon)) return entry # ----- 레이블-콤보박스 위젯 생성 ----- def _create_label_combobox(self, frame, text, help_text, values, default, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) combobox = ttk.Combobox(container, values=values, textvariable=var, width=18, style="TCombobox") combobox.pack(side='left') combobox.set(default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return combobox # ----- 레이블-체크박스 위젯 생성 ----- def _create_label_checkbox(self, frame, text, help_text, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) checkbox = tk.Checkbutton(container, variable=var, bg=self.frame_bg_color, activebackground=self.frame_bg_color, selectcolor=self.accent_color, fg=self.fg_color) checkbox.pack(side='left') help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return checkbox # ----- 조그 민감도 슬라이더 생성 ----- def _create_jog_sensitivity_slider(self, frame): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text="조그 가감속 민감도", width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) def update_slider_value(value): rounded_value = round(float(value), 1) self.jog_sensitivity_var.set(rounded_value) value_label.config(text=f"{rounded_value:.1f}") slider = ttk.Scale(container, from_=1.0, to=10.0, orient="horizontal", variable=self.jog_sensitivity_var, length=200, command=update_slider_value) slider.pack(side='left') value_label = tk.Label(container, text=f"{self.jog_sensitivity_var.get():.1f}", width=5, bg=self.frame_bg_color, fg=self.fg_color) value_label.pack(side='left', padx=(5, 0)) help_label = tk.Label(container, text="조그 가감속 민감도를 조절합니다.", fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) # ----- 태그 시트 셀 생성 ----- def _create_cell(self, parent, row, col, value="", readonly=False, bold=False): if row == 0 or col == 0: label = tk.Label(parent, text=value, width=10, relief="flat", borderwidth=1, bg=self.accent_color, fg=self.fg_color, font=('Arial', 10, 'bold')) label.grid(row=row, column=col, sticky="nsew") return label else: entry = tk.Entry(parent, width=10, justify='center', bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.grid(row=row, column=col, sticky="nsew") entry.insert(0, value) if readonly: entry.config(state='readonly') if bold: entry.config(font=('Arial', 10, 'bold')) self.entries.append(entry) return entry # ----- 키보드 후킹 설정 ----- def _setup_keyboard_hook(self): self.keyboard_hook = KeyboardHook(self) if not self.keyboard_hook.start(): self.logger.error("키보드 후킹 설정 실패") else: self.logger.info("키보드 후킹 설정 성공") # ----- 키 이벤트 처리 (on_press / on_release) ----- def on_press(self, key_str): current_time = time.time() self.logger.debug(f"키 눌림: {key_str}") self.key_press_buffer.append((key_str, current_time)) if len(self.key_press_buffer) > 10: self.key_press_buffer.pop(0) if key_str in self.pressed_keys: return self.pressed_keys.add(key_str) self.key_press_time[key_str] = current_time if key_str == 'f7': self._handle_f7_press() elif key_str in ['f8', 'f9']: self._handle_jog_movement(key_str, current_time) if self.overlay_window_open: if key_str == 'f12': self._on_overlay_close() self._send_tag_to_vmix() elif key_str in ['f16', 'f17', 'f18', 'f19', 'f20', 'f21', 'f22', 'f23', 'f24']: self._handle_overlay_key(key_str) else: if key_str in self.replay_commands: self.replay_commands[key_str]() if key_str in self.double_tap_commands: if current_time - self.last_press_time.get(key_str, 0) < 0.3: self.send_request(self.double_tap_commands[key_str]) self.last_press_time[key_str] = current_time def on_release(self, key_str): current_time = time.time() self.logger.debug(f"키 떼짐: {key_str}") if key_str in self.pressed_keys: self.pressed_keys.remove(key_str) self.key_release_time[key_str] = current_time # ----- F7 키 처리 ----- def _handle_f7_press(self): self.send_request("ReplayJumpToNow&Channel=1") self.logger.info("ReplayJumpToNow 명령 전송") # ----- 조그 이동 처리 (F8, F9) ----- def _handle_jog_movement(self, key_name, current_time): if current_time - self.last_jog_time > 0.5: self.jog_rotation_count = 0 self.jog_direction = None if self.jog_direction != key_name: self.jog_rotation_count = 0 self.jog_direction = key_name sensitivity = self.jog_sensitivity_var.get() self.jog_rotation_count = min(self.jog_rotation_count + 1, len(self.jog_speed_levels) - 1) self.last_jog_time = current_time jog_speed = self.jog_speed_levels[self.jog_rotation_count] direction = -1 if key_name == 'f8' else 1 jog_speed *= sensitivity command = f"ReplayJumpFrames&Value={direction * jog_speed}&Channel=1" self.send_request(command) self.logger.debug(f"조그 이동: 방향={direction}, 속도={jog_speed}, 회전횟수={self.jog_rotation_count}, 민감도={sensitivity}") # ----- 오버레이 키 처리 ----- def _handle_overlay_key(self, key_name): current_time = time.time() if not self.first_key: self.first_key = key_name self.first_key_time = current_time self.overlay_key_timer = self.root.after(300, self._process_single_key, key_name) else: if current_time - self.first_key_time <= 0.3: if self.overlay_key_timer: self.root.after_cancel(self.overlay_key_timer) try: row = int(self.first_key[1:]) - 15 col = int(key_name[1:]) - 15 except ValueError: row, col = 1, 1 self._process_overlay_key(row, col) else: self._process_single_key(key_name) self.first_key = None self.first_key_time = None def _process_single_key(self, key_name): row = 1 try: col = int(key_name[1:]) - 15 except ValueError: col = 1 self._process_overlay_key(row, col) def _process_overlay_key(self, row, col): try: value = self.sheets[self.current_sheet][1].at[row, col] self._update_overlay_label(value) self._move_to_next_sheet() except Exception as e: self.logger.error(f"오버레이 처리 오류: {e}") def _update_overlay_label(self, value): if not self.selected_tags: self.overlay_label_text = value else: self.overlay_label_text += f" {value}" self.selected_tags.append(value) if self.overlay_label: self.overlay_label.config(text=self.overlay_label_text) def _move_to_next_sheet(self): self.current_sheet += 1 try: sheet_count = int(self.tag_sheet_count_var.get()) if self.current_sheet >= sheet_count: self._on_overlay_close() else: if self.notebook_clone: self.notebook_clone.select(self.current_sheet) except ValueError: self.logger.error("유효하지 않은 TAG 시트 수.") def _send_tag_to_vmix(self): if self.selected_tags: tag_text = ' '.join(self.selected_tags) command = f"ReplaySetSelectedEventText&Value={tag_text}" self.send_request(command) self.logger.info(f"vMix에 태그 전송: {tag_text}") self.selected_tags = [] else: self.logger.info("선택된 태그 없음.") # ----- 리플레이 명령어 업데이트 ----- def _update_replay_commands(self, *args): try: camera_count = int(self.camera_count_var.get()) except ValueError: camera_count = 4 self.replay_commands = { "f1": lambda: self.send_request("ReplayPlaySelectedEventToOutput&Channel=1"), "f2": lambda: self.send_request("ReplaySelectNextEvent&Channel=1"), "f3": lambda: self.send_request("ReplaySelectPreviousEvent&Channel=1"), "f4": lambda: self.send_request("ReplaySetSpeed&Value=0.5&Channel=1"), "f5": lambda: self.send_request("ReplayChangeSpeed&Value=-0.1&Channel=1"), "f6": lambda: self.send_request("ReplayChangeSpeed&Value=0.1&Channel=1"), "f7": self._handle_f7_press, "f8": lambda: self._handle_jog_movement("f8", time.time()), "f9": lambda: self._handle_jog_movement("f9", time.time()), "f10": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[3].get()}"), "f11": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[4].get()}"), "f12": self.toggle_overlay, "f13": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[0].get()}"), "f14": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[1].get()}"), "f15": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[2].get()}"), "f16": lambda: self.send_request("ReplayToggleSelectedEventCamera4"), "f17": lambda: self.send_request("ReplayMarkIn&Channel=1"), "f18": lambda: self.send_request("ReplayMarkOut&Channel=1"), "f19": lambda: self.send_request("ReplayToggleSelectedEventCamera1"), "f20": lambda: self.send_request("ReplayToggleSelectedEventCamera2"), "f21": lambda: self.send_request("ReplayToggleSelectedEventCamera3"), "f22": lambda: self.send_request("ReplayPlayNext&Channel=1"), "f23": lambda: self.send_request("ReplayStopEvents&Channel=1"), "f24": lambda: self.send_request("ReplayLiveToggle"), } self.double_tap_commands = { "f19": "ReplaySelectedEventSingleCameraOn&Value=1", "f20": "ReplaySelectedEventSingleCameraOn&Value=2", "f21": "ReplaySelectedEventSingleCameraOn&Value=3", "f16": "ReplaySelectedEventSingleCameraOn&Value=4", } self.logger.info("리플레이 명령어 업데이트 완료.") # ----- 오버레이 토글 ----- def toggle_overlay(self): if self.overlay_toggle: self._on_overlay_close() else: self._create_new_overlay_window() self.overlay_toggle = not self.overlay_toggle # ----- Request Handler 설정 ----- def _setup_request_handler(self): try: ip_address = self.ip_entry.get().strip() if not ip_address: ip_address = "127.0.0.1:8088" self.ip_entry.insert(0, ip_address) base_url = f"http://{ip_address}" self.request_handler = SyncRequestHandler(base_url) self.logger.info(f"Request handler 초기화: {base_url}") except Exception as e: self.logger.error(f"Request handler 설정 실패: {e}") self._show_error(f"vMix 연결 설정 실패: {e}") # ----- vMix 요청 전송 ----- def send_request(self, command): try: if not self.request_handler: self._setup_request_handler() if not self.vmix_connected: self.logger.warning("vMix 연결되지 않음. 재연결 시도 중...") self._check_vmix_connection() if not self.vmix_connected: self._show_error("vMix가 연결되어 있지 않습니다.") return self.request_handler.send_request(command) except Exception as e: self.logger.error(f"요청 전송 에러: {e}") self._show_error(f"명령 전송 실패: {e}") # ----- 에러 메시지 표시 ----- def _show_error(self, message): self.root.after(0, lambda: messagebox.showerror("오류", message)) # ----- IP 주소 체크 ----- def _check_ip(self, ip, label=None): def ping_host(): try: host, port = ip.split(':') with socket.create_connection((host, int(port)), timeout=2): if label: self.root.after(0, lambda: label.config(fg="green")) except: if label: self.root.after(0, lambda: label.config(fg="red")) Thread(target=ping_host, daemon=True).start() # ----- 태그 시트 활성화/비활성화 업데이트 ----- def update_sheet_activation(self, *args): if not hasattr(self, 'notebook'): return try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i in range(3): if i < count: icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.tab(i, state="normal", image=icon) else: self.notebook.tab(i, state="normal") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="normal") else: icon = self.locked_image if self.locked_image else None if icon: self.notebook.tab(i, state="disabled", image=icon) else: self.notebook.tab(i, state="disabled") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="disabled") # ----- 탭 이름 변경 (더블클릭) ----- def _rename_tab(self, event): index = self.notebook.index("@%d,%d" % (event.x, event.y)) if index != "end": tab_text = self.notebook.tab(index, "text") entry = tk.Entry(self.notebook, width=20, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) def set_name(event=None): new_name = entry.get() self.notebook.tab(index, text=new_name) entry.destroy() entry.bind("<Return>", set_name) entry.bind("<FocusOut>", set_name) entry.bind("<Button-1>", lambda e: e.widget.focus_set()) entry.place(x=event.x, y=event.y, anchor="center", width=200, height=30) entry.insert(0, tab_text) entry.select_range(0, tk.END) entry.focus_set() def check_focus(e): if e.widget != entry: set_name() self.root.bind("<Button-1>", check_focus, add="+") def remove_check_focus(event=None): self.root.unbind("<Button-1>", funcid) funcid = self.root.bind("<FocusOut>", remove_check_focus, add="+") # ----- 오버레이 창 생성 ----- def _create_new_overlay_window(self): self._update_data_from_entries() self.selected_tags = [] width = 1100 height = 300 self.overlay_window = tk.Toplevel(self.root) self.overlay_window.title("이벤트 태그 (TAG) 선택") self.overlay_window.attributes("-topmost", True) self.overlay_window.configure(bg=self.bg_color) if hasattr(self, 'icon_path') and os.path.exists(self.icon_path): self.overlay_window.iconbitmap(self.icon_path) monitors = screeninfo.get_monitors() try: monitor_index = max(0, min(int(self.overlay_monitor_var.get()) - 1, len(monitors) - 1)) except ValueError: monitor_index = 0 if monitor_index < 0 or monitor_index >= len(monitors): self.logger.error(f"잘못된 모니터 인덱스: {monitor_index}") self.overlay_window.destroy() return monitor = monitors[monitor_index] overlay_x = monitor.x + (monitor.width - width) // 2 overlay_y = monitor.y + monitor.height - height - 50 self.logger.info(f"오버레이 창 - 모니터 {monitor_index + 1}: (x={overlay_x}, y={overlay_y}), 크기=({width}x{height})") self.overlay_window.geometry(f"{width}x{height}+{overlay_x}+{overlay_y}") self.overlay_window.resizable(False, False) self.notebook_clone = ttk.Notebook(self.overlay_window) self.notebook_clone.pack(expand=True, fill="both", padx=10, pady=10) try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i, (frame, data) in enumerate(self.sheets[:count]): frame_clone = ttk.Frame(self.notebook_clone) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text"), image=icon, compound="left") else: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text")) for j in range(10): frame_clone.grid_columnconfigure(j, weight=1) frame_clone.grid_rowconfigure(j, weight=1) for row in range(1, 10): label_row = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_row.grid(row=row, column=0, sticky="nsew") label_col = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_col.grid(row=0, column=row, sticky="nsew") for row in range(1, 10): for col in range(1, 10): value = data.at[row, col] bg_color = self.entry_bg_color if (row + col) % 2 == 0 else self.bg_color tk.Label(frame_clone, text=value, font=('Arial', 10), bg=bg_color, fg=self.fg_color, borderwidth=1, relief="solid").grid( row=row, column=col, sticky="nsew", padx=1, pady=1) self.overlay_label = tk.Label(self.overlay_window, text=self.overlay_label_text, bg=self.bg_color, fg=self.fg_color, font=('Arial', 12)) self.overlay_label.pack(side="bottom", padx=10, pady=10, fill="x") self.overlay_window.protocol("WM_DELETE_WINDOW", self._on_overlay_close) self.overlay_window_open = True self.current_sheet = 0 # ----- 오버레이 창 닫기 처리 ----- def _on_overlay_close(self): self.overlay_window_open = False self.overlay_close_time = time.time() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.selected_tags: self._send_tag_to_vmix() self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.current_sheet = 0 self.root.after(100, self._process_buffered_keys) self.overlay_toggle = False def _process_buffered_keys(self): current_time = time.time() for key_name, press_time in self.key_press_buffer: if current_time - press_time < 0.5: self._simulate_key_press(key_name) self.key_press_buffer.clear() def _simulate_key_press(self, key_name): self.on_press(key_name) self.root.after(50, lambda: self.on_release(key_name)) # ----- 엔트리 위젯에서 데이터 업데이트 ----- def _update_data_from_entries(self): for entry in self.entries: try: row = int(entry.grid_info().get('row', 0)) col = int(entry.grid_info().get('column', 0)) for frame, data in self.sheets: if entry.master == frame: data.at[row, col] = entry.get() except Exception as e: self.logger.warning(f"데이터 업데이트 오류: {e}") # ----- 데이터 저장 (딕셔너리) ----- def _save_data(self): self._update_data_from_entries() sheet_data = {f"시트 {i+1}": data.to_dict(orient='split') for i, (frame, data) in enumerate(self.sheets)} sheet_names = {f"시트 {i+1}": self.notebook.tab(i, "text") for i in range(self.notebook.index("end"))} return {"sheets": sheet_data, "names": sheet_names} # ----- 파일로 저장 ----- def _save_to_file(self): data_dict = self._save_data() file_path = filedialog.asksaveasfilename(defaultextension=".PDB", filetypes=[("PDB files", "*.PDB")]) if file_path: if not file_path.lower().endswith('.pdb'): file_path += '.PDB' try: with open(file_path, 'w', encoding='utf-8') as file: json.dump(data_dict, file, ensure_ascii=False, indent=4) self.logger.info(f"데이터가 {file_path}에 저장됨.") except Exception as e: self.logger.error(f"파일 저장 에러: {e}") self._show_error(str(e)) # ----- 파일에서 로드 ----- def _load_from_file(self): file_path = filedialog.askopenfilename(filetypes=[("PDB files", "*.PDB")]) if file_path: try: with open(file_path, 'r', encoding='utf-8') as file: data_dict = json.load(file) sheet_data = data_dict.get("sheets", {}) sheet_names = data_dict.get("names", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: self.notebook.tab(i, text=sheet_names.get(sheet_name, sheet_name)) json_str = json.dumps(sheet_data[sheet_name]) loaded_data = pd.read_json(StringIO(json_str), orient='split') for row in range(1, 10): for col in range(1, 10): value = loaded_data.at[row, col] entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info(f"파일에서 데이터 로드 완료: {file_path}") except Exception as e: self.logger.error(f"파일 로드 에러: {e}") self._show_error(str(e)) else: self.logger.info("파일 로드 취소됨.") # ----- 모든 데이터 초기화 ----- def _clear_all_data(self): if messagebox.askokcancel("초기화 확인", "정말 모든 데이터를 초기화 하시겠습니까?"): for entry in self.entries: entry.config(state="normal") entry.delete(0, tk.END) for frame, data in self.sheets: data.loc[:] = "" self.logger.info("모든 데이터 초기화 완료.") # ----- 특정 좌표의 엔트리 반환 ----- def _get_entry(self, frame, row, col): for widget in frame.winfo_children(): if isinstance(widget, tk.Entry): info = widget.grid_info() if int(info.get('row', -1)) == row and int(info.get('column', -1)) == col: return widget return None # ----- vMix 연결 체크 (백그라운드 스레드에서) ----- def _check_vmix_connection(self): try: ip_address = self.ip_entry.get().strip() response = requests.get(f"http://{ip_address}/api/", timeout=0.5) response.raise_for_status() is_connected = True self.logger.info("vMix 연결됨") except requests.exceptions.RequestException as e: is_connected = False self.logger.error(f"vMix 연결 에러: {e}") if not self.request_handler: self._setup_request_handler() self.root.after(0, lambda: self._update_vmix_status_indicator(is_connected)) # ----- 주기적 vMix 연결 체크 (5초 간격) ----- def _start_vmix_check_loop(self): current_time = time.time() if current_time - self.last_vmix_check >= self.vmix_check_interval: self.last_vmix_check = current_time Thread(target=self._check_vmix_connection, daemon=True).start() self.root.after(5000, self._start_vmix_check_loop) # ----- vMix 상태 표시기 업데이트 ----- def _update_vmix_status_indicator(self, connected): color = "green" if connected else "red" if hasattr(self, 'vmix_status_indicator'): self.vmix_status_indicator.delete("status_oval") self.vmix_status_indicator.create_oval(2, 2, 13, 13, fill=color, tags="status_oval") self.vmix_connected = connected # ----- 설정 저장 ----- def _save_settings(self): settings = { 'ip': self.ip_entry.get(), 'camera_count': self.camera_count_var.get(), 'overlay_on_off': self.overlay_on_off_var.get(), 'overlay_monitor': self.overlay_monitor_var.get(), 'tag_sheet_count': self.tag_sheet_count_var.get(), 'jog_sensitivity': self.jog_sensitivity_var.get(), } for i, var in enumerate(self.end_time_vars): settings[f'end_time_{i+1}'] = var.get() sheet_data = {} for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" sheet_data[sheet_name] = {} for row in range(1, 10): for col in range(1, 10): entry = self._get_entry(frame, row, col) if entry: sheet_data[sheet_name][f"{row},{col}"] = entry.get() settings['sheet_data'] = sheet_data try: with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=4) self.logger.info("설정 저장 완료.") except Exception as e: self.logger.error(f"설정 저장 실패: {e}") self._show_error(f"설정 저장 실패: {e}") # ----- 설정 로드 ----- def _load_settings(self): try: with open('settings.json', 'r', encoding='utf-8') as f: settings = json.load(f) self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, settings.get('ip', '127.0.0.1:8088')) self._check_ip(self.ip_entry.get()) for i, var in enumerate(self.end_time_vars): var.set(settings.get(f'end_time_{i+1}', str((i+1)*10))) self.camera_count_var.set(settings.get('camera_count', '4')) self.overlay_on_off_var.set(settings.get('overlay_on_off', True)) self.overlay_monitor_var.set(settings.get('overlay_monitor', '1')) self.tag_sheet_count_var.set(settings.get('tag_sheet_count', '3')) self.jog_sensitivity_var.set(settings.get('jog_sensitivity', 5)) sheet_data = settings.get("sheet_data", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: for row in range(1, 10): for col in range(1, 10): value = sheet_data[sheet_name].get(f"{row},{col}", "") entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info("설정 로드 완료.") except FileNotFoundError: self.logger.warning("설정 파일 없음. 기본값 사용.") except json.JSONDecodeError as e: self.logger.error(f"설정 파일 JSON 파싱 에러: {e}") self._show_error(f"설정 파일 파싱 오류: {e}") # ----- 프로그램 종료 처리 ----- def _on_closing(self): if hasattr(self, 'keyboard_hook') and self.keyboard_hook: self.keyboard_hook.stop() self._save_settings() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.request_handler: self.request_handler.close() self.logger.info("프로그램 종료.") self.root.destroy() # ----- 메인 루프 실행 ----- def run(self): self.root.protocol("WM_DELETE_WINDOW", self._on_closing) self.root.mainloop() # ------------------------------------------------------------------------- # 누락된 _get_monitor_count 메서드 추가 (모니터 갯수를 반환) @functools.lru_cache(maxsize=None) def _get_monitor_count(self): """모니터 갯수 반환 (캐싱 사용)""" return len(screeninfo.get_monitors())

=============================================================================

main 함수

=============================================================================

def main():
controller = ReplayController()
controller.run()

if name == "main":
main()

코드 전체 분석해서 기능 누락없이 전체적으로 수정 개선해줘 문제점들 해결하고 추가 제안 사항 있으면 만들어줘
전체 코드 복붙만 하면 되도록 전부 적어줘

사고

분석하고 수정하기

이 Python 코드는 vMix Replay Controller를 위한 것으로, Windows에서 F 키를 가로채고, tkinter를 사용해 GUI를 만들고 사례를 추가하는 등의 기능을 갖추고 있어. 이제 전체 코드를 분석하고 수정하여 전체 기능을 누락하지 않고 문제를 해결하는 데 초점을 맞추고 있어.

코드 분석하기

코드는 Windows-specific 라이브러리와 Windows 키보드 훅을 사용하고 있어. "win32con"과 "windll" 같은 것을 볼 수 있네. tkinter와 vMix API가 활용되고 있는 것도 주목할 만해.

코드 보완 방안 모색

코드에서 "root.after"를 사용해 UI 변경을 예약하고 있는 상황이네요. 키보드 훅 콜백 함수의 오류 처리 강화를 통해 스레드 안전성을 높일 수 있을 거에요.

코드 세부 사항 검토

lambda와 "after" 사용을 확인하고 있어. 설정 파일 위치를 개선할 방법도 찾고 있고, 이벤트 바인딩과 셋트 재활용 방식을 검사하고 있어.

향상된 매핑 점검 중

"_handle_overlay_key"에서 F 키를 셀 인덱스로 해석하고 있어. "_update_replay_commands"에서는 F 키에 대한 명령을 람다 함수로 맵핑하고, on_press에서 실행하고 있네.

키 처리 방법 분석

코드는 F7 키를 on_press에서 두 번 처리하지 않고 있어. 첫 번째 조건문은 열려 있는 오버레이 창을 확인하고, 두 번째는 replay_commands와 double_tap_commands에서 키를 다루고 있어.

키 동작 분석 중

on_press 함수에서는 F7 키가 두 번 처리되지 않고 있어. 첫 번째 조건문과 else 블록이 독립적이라 두 번째 호출이 발생하지 않네.

명령흐름 재작업

F7이 두 번 실행되는 걸 보니, 첫 번째 if 블록을 제거하고 replay_commands가 항상 F7을 처리하도록 하는 게 어떨까 생각하고 있어. Hm, 약간의 조정이 필요할 수 있겠어.

재택 명령 처리 검사

replay_commands에서 F7, F8, F9를 제외하고 있어. on_press에서 오버레이 창 여부에 따라 명령을 처리하고, 더블 탭 명령도 다루고 있네.

명령 흐름 조정 보호

replay_commands에서 F7, F8, F9의 명령을 관리하고 있고, 현재 시간 재설정을 피하기 위해 lambdas를 중단하고 있어.

설정 조정 탐색

replay_commands에서 F7, F8, F9 명령을 다루고 있고, 명령 재설정을 피하려고 lambdas를 사용 중이야. 그래도 두 번 호출은 여전히 남아 있네.

명령 처리를 최적화하기

F7, F8, F9 명령을 replay_commands에서 다루는 것 같아. 명령 재설정은 lambdas를 통해 피하고, 명령 흐름을 간소화하고 있어.

명령 흐름 결정하기

on_press 함수에서 overlay_window_open 여부에 따라 명령을 처리하고 있어. replay_commands와 double_tap_commands를 통해 명령을 최적화 중이야.

명령 확인 중

f12는 on_press에서 toggle_overlay로 떨어지지만 중복 처리가 이상해 보여. key_press_buffer와의 연계도 조금 수정이 필요해.

코드 분석 진행 중
초기 연결은 괜찮고, Entry 위젯 검토에 독립성은 만족스러워. 로그는 INFO 수준으로 사용하고 있어. 파일 생성에 컨텍스트 매니저를 사용해서 안전을 높이고 있어.

구조 검토 중

KeyboardHook 설정과 vMix 연결 확인 루프를 살펴보니, 각 구성 요소가 제대로 작동하고 있어. 전체 코드를 작성하며 명확한 구조를 유지하고 있어.

명령 흐름 최적화
명령을 처리할 때 명시적 if 블록을 빼고 replay_commands dictionary를 활용하여 중복 처리를 피하고 있어. 코드 가독성 및 구조를 개선하는 중이야.

구현 계획 검토 중

자동 저장 기능을 선택 사항으로 두고, 로그 파일 핸들러와 configparser 사용을 고려 중이야. 전체 코드를 포함하는 구문 블록을 완성하고 있어.

윈도우 API 활용

LowLevelKeyboardProc를 활용해 윈도우 API의 키보드 훅킹을 구현하며, PIL에서 발생하는 워닝을 무시화하는 방식으로 구동하고 있어.

기능키 로깅 처리

KeyboardHook 클래스의 _hook_proc 메서드는 F1~F24 키 이벤트를 처리하고, on_presson_release 메서드를 호출하여 기능 키를 컨트롤러로 전달하고 있어.

키보드 훅 정지 및 vMix API 요청 처리

KeyboardHook의 stop 메서드를 통해 키보드 훅을 중지하고 있어. SyncRequestHandler는 스레드 풀을 사용해 vMix API 명령을 처리하고, 오류를 로깅하는 방식이야.

구성 요소 추가 및 시각적 설정

vMix 상태 지시기와 GUI 구성 요소를 추가하고 있어. 다크 테마를 적용하며, 스타일과 아이콘을 설정 중이지.

위젯 추가와 정의

기본 설정 프레임에 IP 입력과 상태 지시기를 배치하고 있어. 카메라 수를 설정하는 콤보박스와 끝 점 시간 설정을 위한 콤보박스를 정의하고 있네.

위젯 및 기능 설정

TAG 시스템 시트 개수 조정과 조그 민감도 슬라이더를 만들고 있어. 오버레이 설정을 위해 체크박스와 콤보박스를 정의하고, 카메라 수를 설정하는 콤보박스를 놓고 있네.

아이콘 및 프레임 구성

아이콘을 불러와서 아이콘 이미지로 설정하고, 여러 프레임을 만들어 다양한 구성 요소를 배치하고 있네.

키 이벤트 분석

키 눌림과 떼어냄을 분석하고, 오버레이 키를 처리하는 방법을 설명하고 있어. 슬라이드와 셀 처리를 통해 각 기능을 트리거하고 있네.

명령어 업데이트와 핸들링

재생 명령어를 업데이트하고, 조그 이동을 핸들링하는 방법을 설명하고 있어. 오버레이 토글과 요청 핸들러 설정도 다루고 있네.

vMix 연결 설정 및 요청 보내기

vMix 연결을 설정하고, 요청을 보내는 방법을 설명하고 있어. 연결 문제에 대한 경고와 에러 메시지를 다루고 있네.

vMix 재연결 시도

vMix 연결이 끊기면 재연결을 시도하고 있어. 연결된 상태에서 명령어를 보내는 것도 잊지 않는 중이야. 연결 문제를 확인하고 해결책을 모색 중이야.

오버레이 창 관리

오버레이 창을 생성하고 닫는 방법을 작업하며, 키버퍼를 처리하고 데이터를 업데이트하고 있어. 마지막으로 파일에 데이터를 저장하는 중이야.

파일 저장 및 데이터 불러오기

파일에 데이터를 저장하고 로드하는 방법을 설명하고 있어. 초기화 작업도 포함돼 있네.

vMix 상태 확인 및 저장

주기적으로 vMix 연결을 확인하고, 상태를 표시하는 인디케이터를 업데이트하고 있어. 설정을 저장하고 로드하는 방법을 설명 중이야.

설정 정보 처리

설정 파일을 읽고, 각 항목을 처리하며 설정을 로드하고 있어요. 찾을 수 없으면 경고 메시지를 남기고요.

1m 59s 동안 Reasoned about code analysis and improvements

답변

아래는 기존 기능을 모두 유지하면서 몇 가지 문제점을 수정·개선한 최종 전체 코드입니다. 주요 수정 및 개선 사항은 다음과 같습니다:

  • 중복 키 처리 제거: 원래 on_press()에서 F7, F8, F9를 별도로 처리하면서 replay_commands에도 동일 기능을 등록해 두어 중복 호출되는 문제를 수정하였습니다.
  • 코드 정리 및 주석 보강: 각 함수와 클래스의 역할을 명확하게 설명하도록 주석을 보강하고, 함수 내부 로직을 조금 더 깔끔하게 정리하였습니다.
  • 예외 처리 및 로깅 개선: 각종 에러 상황에 대해 보다 명확하게 로깅하고, 필요시 tkinter.after()를 사용하여 UI 스레드에서 안전하게 처리하도록 개선하였습니다.
  • 추가 제안:
    • 설정파일(settings.json) 저장 위치를 프로그램 실행 폴더가 아닌 사용자 전용 데이터 폴더(예, AppData 등)로 변경하면 좋겠습니다.
    • Overlay 창의 클론(notebook_clone) 생성시 별도의 독립 복사본을 만드는 것도 유지보수에 도움이 됩니다.

아래의 코드를 그대로 복사하여 사용하시면 됩니다.


python
import ctypes import sys import tkinter as tk from tkinter import ttk, messagebox, filedialog import pandas as pd import json from io import StringIO import screeninfo import requests from threading import Thread, Lock from PIL import Image, ImageTk import os import time import logging from queue import Queue import socket import functools import math import win32con from ctypes import windll, CFUNCTYPE, POINTER, c_int, c_void_p, byref, c_long, cast from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPARAM, WPARAM, LONG # ============================================================================= # Logging configuration # ============================================================================= logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.getLogger('PIL').setLevel(logging.WARNING) # ============================================================================= # Windows low-level keyboard hooking structures and callback definition using ctypes # ============================================================================= class KBDLLHOOKSTRUCT(ctypes.Structure): _fields_ = [ ("vkCode", DWORD), ("scanCode", DWORD), ("flags", DWORD), ("time", DWORD), ("dwExtraInfo", POINTER(c_void_p)) ] LowLevelKeyboardProc = CFUNCTYPE(LONG, c_int, WPARAM, LPARAM) class KeyboardHook: """ Windows API low-level keyboard hooking class. Hooks function keys F1~F24 and calls controller's on_press/on_release events. """ def __init__(self, controller): self.controller = controller self.user32 = windll.user32 self.kernel32 = windll.kernel32 self.hooked = None self.hInstance = self.kernel32.GetModuleHandleW(None) self.function_key_vkcodes = set(range(0x70, 0x88)) # F1 ~ F24 (vkCode range) self.pointer = LowLevelKeyboardProc(self._hook_proc) self.is_running = False def _hook_proc(self, nCode, wParam, lParam): """ Hook callback function – processes key events and suppresses system propagation if needed. """ try: if nCode == win32con.HC_ACTION: kb = cast(lParam, POINTER(KBDLLHOOKSTRUCT)).contents vk_code = kb.vkCode if vk_code in self.function_key_vkcodes: key_str = f"f{vk_code - 0x6F}" if wParam in (win32con.WM_KEYDOWN, win32con.WM_SYSKEYDOWN): self.controller.on_press(key_str) elif wParam in (win32con.WM_KEYUP, win32con.WM_SYSKEYUP): self.controller.on_release(key_str) return 1 # Block further propagation except Exception as e: self.controller.logger.error(f"Keyboard hook error: {e}") return self.user32.CallNextHookEx(self.hooked, nCode, wParam, lParam) def start(self): """Starts the keyboard hook.""" if self.is_running: return False self.hooked = self.user32.SetWindowsHookExA( win32con.WH_KEYBOARD_LL, self.pointer, self.hInstance, 0 ) if not self.hooked: self.controller.logger.error("Failed to set keyboard hook") return False self.is_running = True self.controller.logger.info("Keyboard hook started") self.hook_thread = Thread(target=self._pump_messages, daemon=True) self.hook_thread.start() return True def _pump_messages(self): """Message pump to continuously process keyboard events.""" msg = MSG() while self.is_running: while self.user32.PeekMessageA(byref(msg), None, 0, 0, win32con.PM_REMOVE): self.user32.TranslateMessage(byref(msg)) self.user32.DispatchMessageA(byref(msg)) time.sleep(0.01) def stop(self): """Stops the keyboard hook.""" if self.is_running and self.hooked: self.user32.UnhookWindowsHookEx(self.hooked) self.is_running = False self.hooked = None self.controller.logger.info("Keyboard hook stopped") # ============================================================================= # vMix API synchronous request handler (thread pool, throttling, error handling) # ============================================================================= class SyncRequestHandler: def __init__(self, base_url, max_concurrent_requests=5, request_timeout=5): self.base_url = base_url self.queue = Queue() self.max_concurrent_requests = max_concurrent_requests self.workers = [] self.session = requests.Session() self.request_timeout = request_timeout self.lock = Lock() self.last_request_time = {} self.throttle_interval = 0.05 # 50ms throttling self._start_workers() def _start_workers(self): for _ in range(self.max_concurrent_requests): worker = Thread(target=self._worker_thread, daemon=True) worker.start() self.workers.append(worker) def _worker_thread(self): while True: command = self.queue.get() if command is None: break # Termination signal try: self._send_request(command) except Exception as e: logging.error(f"Worker thread error: {e}") finally: self.queue.task_done() def _send_request(self, command): url = f"{self.base_url}/api/?Function={command}" current_time = time.time() with self.lock: if command in self.last_request_time: if current_time - self.last_request_time[command] < self.throttle_interval: logging.debug(f"Throttling: {command}") return self.last_request_time[command] = current_time try: response = self.session.get(url, timeout=self.request_timeout) response.raise_for_status() logging.info(f"Request success: {url} - Status code: {response.status_code}") except requests.exceptions.Timeout: logging.error(f"Request timeout: {url}") except requests.exceptions.ConnectionError: logging.error(f"Connection error: {url}") except requests.exceptions.RequestException as e: logging.error(f"Request error: {url} - {e}") def send_request(self, command): self.queue.put(command) def close(self): for _ in self.workers: self.queue.put(None) for worker in self.workers: worker.join() self.session.close() # ============================================================================= # ReplayController class (integrates GUI, keyboard hook, and vMix communication) # ============================================================================= class ReplayController: def __init__(self): self.logger = logging.getLogger(__name__) self.root = tk.Tk() self.root.title("최반장 리플레이 컨트롤러 간소판 v1.22 - 개선됨") self._setup_icon() self._initialize_variables() self._setup_dark_theme() self._create_frames() # Create vMix status indicator in the 기본 설정 frame self.vmix_status_indicator = tk.Canvas( self.frames["기본 설정"], width=14, height=14, bg=self.frame_bg_color, highlightthickness=0, borderwidth=0 ) self._create_widgets() self._create_notebook() self._setup_keyboard_hook() self._setup_request_handler() self._start_vmix_check_loop() self._load_settings() self.root.geometry("870x800") self.root.minsize(870, 800) self.root.resizable(True, True) self.root.columnconfigure(0, weight=1) self.root.rowconfigure(4, weight=1) # ----- Icon setup ----- def _setup_icon(self): script_dir = os.path.dirname(os.path.abspath(__file__)) icon_path = os.path.join(script_dir, "chobanjangTV_icon.ico") if os.path.exists(icon_path): try: self.root.iconbitmap(icon_path) self.icon_path = icon_path self.logger.info("Icon set successfully") except Exception as e: self.logger.warning(f"Icon setup failed: {e}") else: self.logger.warning(f"Icon file not found: {icon_path}") # ----- Variable initialization ----- def _initialize_variables(self): self.jog_sensitivity_var = tk.DoubleVar(value=5) self.jog_rotation_count = 0 self.last_jog_time = 0 self.jog_direction = None self.jog_speed_levels = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] self.request_handler = None self.vmix_connected = False self.last_vmix_check = 0 self.vmix_check_interval = 5.0 self.overlay_window = None self.overlay_window_open = False self.key_press_buffer = [] self.selected_tags = [] self.current_sheet = 0 self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.overlay_toggle = False self.last_press_time = {} self.double_tap_commands = {} self.replay_commands = {} self.pressed_keys = set() self.key_press_time = {} self.key_release_time = {} self.first_key = None self.first_key_time = None self.overlay_key_timer = None self.notebook_clone = None self.overlay_label = None self.overlay_close_time = 0 self.end_time_vars = [tk.StringVar() for _ in range(5)] self.camera_count_var = tk.StringVar() self.overlay_on_off_var = tk.BooleanVar(value=True) self.overlay_monitor_var = tk.StringVar() self.tag_sheet_count_var = tk.StringVar() self.tag_sheet_count_var.trace_add("write", self.update_sheet_activation) self.sheets = [] self.entries = [] # ----- Dark theme setup ----- def _setup_dark_theme(self): self.bg_color = '#1e1e1e' self.fg_color = '#ffffff' self.accent_color = '#007acc' self.entry_bg_color = '#1a1a1a' self.button_bg_color = '#3c3c3c' self.frame_bg_color = '#252526' self.root.configure(bg=self.bg_color) self.style = ttk.Style() self.style.theme_create("darktheme", parent="alt", settings={ "TNotebook": {"configure": {"background": self.bg_color, "tabmargins": [2, 5, 2, 0]}}, "TNotebook.Tab": {"configure": {"padding": [20, 10], "background": self.bg_color, "foreground": self.fg_color}, "map": {"background": [("selected", self.accent_color)], "expand": [("selected", [1, 1, 1, 0])]}}, "TFrame": {"configure": {"background": self.frame_bg_color}}, "TLabel": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, "TEntry": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color, "insertcolor": self.fg_color}}, "TButton": {"configure": {"background": self.button_bg_color, "foreground": self.fg_color}}, "TCombobox": {"configure": {"fieldbackground": self.entry_bg_color, "foreground": self.fg_color}}, "TCheckbutton": {"configure": {"background": self.frame_bg_color, "foreground": self.fg_color}}, }) self.style.theme_use("darktheme") self.style.configure('TNotebook.Tab', padding=[20, 10], font=('Arial', 12)) self.style.map('TNotebook.Tab', expand=[('selected', [1, 1, 1, 0])], font=[('selected', ('Arial', 12, 'bold'))]) # ----- Icon loading (unlocked/locked) ----- def _load_icons(self): script_dir = os.path.dirname(os.path.abspath(__file__)) unlocked_path = os.path.join(script_dir, "unlocked.png") locked_path = os.path.join(script_dir, "locked.png") try: if os.path.exists(unlocked_path): self.unlocked_image = ImageTk.PhotoImage(Image.open(unlocked_path).resize((16, 16))) else: self.unlocked_image = None self.logger.warning(f"unlocked.png not found: {unlocked_path}") if os.path.exists(locked_path): self.locked_image = ImageTk.PhotoImage(Image.open(locked_path).resize((16, 16))) else: self.locked_image = None self.logger.warning(f"locked.png not found: {locked_path}") except Exception as e: self.logger.error(f"Failed to load icons: {e}") self.unlocked_image = None self.locked_image = None # ----- Frame creation ----- def _create_frames(self): self.frames = {} frame_texts = ["기본 설정", "조그 설정", "오버레이 설정", "TAG 시스템"] for i, text in enumerate(frame_texts): frame = ttk.Frame(self.root) frame.grid(row=i, column=0, sticky="nsew", padx=5, pady=5) frame.configure(style='TFrame') label = tk.Label(frame, text=text, font=('Arial', 12, 'bold'), bg=self.accent_color, fg=self.fg_color) label.pack(fill='x', pady=(0, 5)) self.frames[text] = frame # ----- Widget creation and placement ----- def _create_widgets(self): # 기본 설정 frame basic_frame = self.frames["기본 설정"] ip_frame = ttk.Frame(basic_frame) ip_frame.pack(fill='x', padx=10, pady=5) self.ip_entry = self._create_label_entry( ip_frame, "아이피", "vmix 리플레이가 실행되는 아이피와 포트 주소", "127.0.0.1:8088", self._check_ip ) # Place status indicator self.vmix_status_indicator.pack(side='left', padx=(5, 0), in_=ip_frame) self._update_vmix_status_indicator(False) # End time comboboxes end_time_defaults = [5, 10, 20, 30, 40] for i in range(5): self._create_label_combobox( basic_frame, f"끝 점 시간 설정 {i+1} (초)", f"끝 점(outpoint) 마크시 시간 설정 {i+1}", [str(j) for j in range(1, 61)], str(end_time_defaults[i]), self.end_time_vars[i] ) # Camera count combobox self._create_label_combobox( basic_frame, "카메라 수", "리플레이에 쓸 카메라 수 설정. 사용하지 않는 카메라는 실수 방지를 위해 비활성화됩니다.", [str(i) for i in range(1, 5)], "4", self.camera_count_var ) # 조그 설정 frame: jog sensitivity slider self._create_jog_sensitivity_slider(self.frames["조그 설정"]) # 오버레이 설정 overlay_frame = self.frames["오버레이 설정"] self._create_label_checkbox( overlay_frame, "오버레이 on/off", "리플레이 정보 오버레이를 활성화/비활성화 합니다.", self.overlay_on_off_var ) self.overlay_monitor_combobox = self._create_label_combobox( overlay_frame, "오버레이 모니터 번호", f"오버레이가 표시될 모니터 번호 (현재 모니터 갯수: {self._get_monitor_count()})", [str(i) for i in range(1, self._get_monitor_count()+1)], "1", self.overlay_monitor_var ) # TAG 시스템 설정 tag_frame = self.frames["TAG 시스템"] self.tag_sheet_count_combobox = self._create_label_combobox( tag_frame, "태그 시스템 사용 시트수", "사용할 태그 시스템 시트수를 조정합니다.", ["1", "2", "3"], "3", self.tag_sheet_count_var ) # Bottom button frame self._create_button_frame() # Update replay commands when variables change for var in self.end_time_vars: var.trace_add("write", self._update_replay_commands) self.camera_count_var.trace_add("write", self._update_replay_commands) self.jog_sensitivity_var.trace_add("write", self._update_replay_commands) # ----- TAG system notebook creation ----- def _create_notebook(self): self.notebook = ttk.Notebook(self.root) self.notebook.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) self._load_icons() for sheet_num in range(1, 4): frame = ttk.Frame(self.notebook) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.add(frame, text=f"시트 {sheet_num}", image=icon, compound="left") else: self.notebook.add(frame, text=f"시트 {sheet_num}") data = pd.DataFrame("", index=range(1, 10), columns=range(1, 10)) self.sheets.append((frame, data)) for frame, data in self.sheets: for i in range(10): frame.grid_columnconfigure(i, weight=1) frame.grid_rowconfigure(i, weight=1) for i in range(1, 10): self._create_cell(frame, 0, i, value=str(i), readonly=True, bold=True) self._create_cell(frame, i, 0, value=str(i), readonly=True, bold=True) for i in range(1, 10): for j in range(1, 10): self._create_cell(frame, i, j) self.notebook.bind("<Double-1>", self._rename_tab) self.update_sheet_activation() # ----- Bottom button frame creation ----- def _create_button_frame(self): button_frame = ttk.Frame(self.root) button_frame.grid(row=5, column=0, sticky="ew", padx=5, pady=5) dev_label = tk.Label(button_frame, text="제작 ", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) dev_label.grid(row=0, column=0, sticky="w", padx=(5,0)) pdb_label = tk.Label(button_frame, text="PDB", font=('Arial', 10), bg=self.bg_color, fg="#a80000") pdb_label.grid(row=0, column=1, sticky="w") omo_label = tk.Label(button_frame, text="오모 (240809 update)", font=('Arial', 10), bg=self.bg_color, fg=self.fg_color) omo_label.grid(row=0, column=2, sticky="w") save_button = tk.Button(button_frame, text="저장", command=self._save_to_file, width=10, bg=self.button_bg_color, fg=self.fg_color) save_button.grid(row=0, column=3, sticky="e", padx=5) load_button = tk.Button(button_frame, text="불러오기", command=self._load_from_file, width=10, bg=self.button_bg_color, fg=self.fg_color) load_button.grid(row=0, column=4, sticky="e", padx=5) clear_button = tk.Button(button_frame, text="값 초기화", command=self._clear_all_data, width=10, bg=self.button_bg_color, fg=self.fg_color) clear_button.grid(row=0, column=5, sticky="e", padx=5) # ----- Create label-entry widget ----- def _create_label_entry(self, frame, text, help_text, default, ip_check_func=None): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) entry = tk.Entry(container, width=18, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.pack(side='left') entry.insert(0, default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) if ip_check_func: status_icon = tk.Label(container, text="●", fg="red", bg=self.frame_bg_color) status_icon.pack(side='left', padx=(5, 0)) entry.bind("<FocusOut>", lambda event: ip_check_func(entry.get(), status_icon)) self.root.after(100, lambda: ip_check_func(entry.get(), status_icon)) return entry # ----- Create label-combobox widget ----- def _create_label_combobox(self, frame, text, help_text, values, default, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) combobox = ttk.Combobox(container, values=values, textvariable=var, width=18, style="TCombobox") combobox.pack(side='left') combobox.set(default) help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return combobox # ----- Create label-checkbox widget ----- def _create_label_checkbox(self, frame, text, help_text, var): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text=text, width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) checkbox = tk.Checkbutton(container, variable=var, bg=self.frame_bg_color, activebackground=self.frame_bg_color, selectcolor=self.accent_color, fg=self.fg_color) checkbox.pack(side='left') help_label = tk.Label(container, text=help_text, fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) return checkbox # ----- Create jog sensitivity slider ----- def _create_jog_sensitivity_slider(self, frame): container = ttk.Frame(frame) container.pack(fill='x', padx=10, pady=5) label = tk.Label(container, text="조그 가감속 민감도", width=25, anchor='e', bg=self.frame_bg_color, fg=self.fg_color) label.pack(side='left', padx=(0, 5)) def update_slider_value(value): rounded_value = round(float(value), 1) self.jog_sensitivity_var.set(rounded_value) value_label.config(text=f"{rounded_value:.1f}") slider = ttk.Scale(container, from_=1.0, to=10.0, orient="horizontal", variable=self.jog_sensitivity_var, length=200, command=update_slider_value) slider.pack(side='left') value_label = tk.Label(container, text=f"{self.jog_sensitivity_var.get():.1f}", width=5, bg=self.frame_bg_color, fg=self.fg_color) value_label.pack(side='left', padx=(5, 0)) help_label = tk.Label(container, text="조그 가감속 민감도를 조절합니다.", fg="gray70", bg=self.frame_bg_color) help_label.pack(side='left', padx=(5, 0)) # ----- Create TAG system cell ----- def _create_cell(self, parent, row, col, value="", readonly=False, bold=False): if row == 0 or col == 0: label = tk.Label(parent, text=value, width=10, relief="flat", borderwidth=1, bg=self.accent_color, fg=self.fg_color, font=('Arial', 10, 'bold')) label.grid(row=row, column=col, sticky="nsew") return label else: entry = tk.Entry(parent, width=10, justify='center', bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) entry.grid(row=row, column=col, sticky="nsew") entry.insert(0, value) if readonly: entry.config(state='readonly') if bold: entry.config(font=('Arial', 10, 'bold')) self.entries.append(entry) return entry # ----- Keyboard hook setup ----- def _setup_keyboard_hook(self): self.keyboard_hook = KeyboardHook(self) if not self.keyboard_hook.start(): self.logger.error("Failed to set keyboard hook") else: self.logger.info("Keyboard hook set successfully") # ----- Key event handling ----- def on_press(self, key_str): current_time = time.time() self.logger.debug(f"Key pressed: {key_str}") self.key_press_buffer.append((key_str, current_time)) if len(self.key_press_buffer) > 10: self.key_press_buffer.pop(0) if key_str in self.pressed_keys: return self.pressed_keys.add(key_str) self.key_press_time[key_str] = current_time if self.overlay_window_open: if key_str == 'f12': self._on_overlay_close() self._send_tag_to_vmix() elif key_str in ['f16', 'f17', 'f18', 'f19', 'f20', 'f21', 'f22', 'f23', 'f24']: self._handle_overlay_key(key_str) else: if key_str in self.replay_commands: self.replay_commands[key_str]() if key_str in self.double_tap_commands: if current_time - self.last_press_time.get(key_str, 0) < 0.3: self.send_request(self.double_tap_commands[key_str]) self.last_press_time[key_str] = current_time def on_release(self, key_str): current_time = time.time() self.logger.debug(f"Key released: {key_str}") if key_str in self.pressed_keys: self.pressed_keys.remove(key_str) self.key_release_time[key_str] = current_time # ----- Overlay key handling ----- def _handle_overlay_key(self, key_name): current_time = time.time() if not self.first_key: self.first_key = key_name self.first_key_time = current_time self.overlay_key_timer = self.root.after(300, self._process_single_key, key_name) else: if current_time - self.first_key_time <= 0.3: if self.overlay_key_timer: self.root.after_cancel(self.overlay_key_timer) try: row = int(self.first_key[1:]) - 15 col = int(key_name[1:]) - 15 except ValueError: row, col = 1, 1 self._process_overlay_key(row, col) else: self._process_single_key(key_name) self.first_key = None self.first_key_time = None def _process_single_key(self, key_name): row = 1 try: col = int(key_name[1:]) - 15 except ValueError: col = 1 self._process_overlay_key(row, col) def _process_overlay_key(self, row, col): try: value = self.sheets[self.current_sheet][1].at[row, col] self._update_overlay_label(value) self._move_to_next_sheet() except Exception as e: self.logger.error(f"Overlay processing error: {e}") def _update_overlay_label(self, value): if not self.selected_tags: self.overlay_label_text = value else: self.overlay_label_text += f" {value}" self.selected_tags.append(value) if self.overlay_label: self.overlay_label.config(text=self.overlay_label_text) def _move_to_next_sheet(self): self.current_sheet += 1 try: sheet_count = int(self.tag_sheet_count_var.get()) if self.current_sheet >= sheet_count: self._on_overlay_close() else: if self.notebook_clone: self.notebook_clone.select(self.current_sheet) except ValueError: self.logger.error("Invalid TAG sheet count.") def _send_tag_to_vmix(self): if self.selected_tags: tag_text = ' '.join(self.selected_tags) command = f"ReplaySetSelectedEventText&Value={tag_text}" self.send_request(command) self.logger.info(f"Sent tag to vMix: {tag_text}") self.selected_tags = [] else: self.logger.info("No tags selected.") # ----- Replay commands update ----- def _update_replay_commands(self, *args): try: camera_count = int(self.camera_count_var.get()) except ValueError: camera_count = 4 self.replay_commands = { "f1": lambda: self.send_request("ReplayPlaySelectedEventToOutput&Channel=1"), "f2": lambda: self.send_request("ReplaySelectNextEvent&Channel=1"), "f3": lambda: self.send_request("ReplaySelectPreviousEvent&Channel=1"), "f4": lambda: self.send_request("ReplaySetSpeed&Value=0.5&Channel=1"), "f5": lambda: self.send_request("ReplayChangeSpeed&Value=-0.1&Channel=1"), "f6": lambda: self.send_request("ReplayChangeSpeed&Value=0.1&Channel=1"), "f7": lambda: self.send_request("ReplayJumpToNow&Channel=1"), "f8": lambda: self._handle_jog_movement("f8", time.time()), "f9": lambda: self._handle_jog_movement("f9", time.time()), "f10": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[3].get()}"), "f11": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[4].get()}"), "f12": self.toggle_overlay, "f13": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[0].get()}"), "f14": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[1].get()}"), "f15": lambda: self.send_request(f"ReplayMarkInOut&Value={self.end_time_vars[2].get()}"), "f16": lambda: self.send_request("ReplayToggleSelectedEventCamera4"), "f17": lambda: self.send_request("ReplayMarkIn&Channel=1"), "f18": lambda: self.send_request("ReplayMarkOut&Channel=1"), "f19": lambda: self.send_request("ReplayToggleSelectedEventCamera1"), "f20": lambda: self.send_request("ReplayToggleSelectedEventCamera2"), "f21": lambda: self.send_request("ReplayToggleSelectedEventCamera3"), "f22": lambda: self.send_request("ReplayPlayNext&Channel=1"), "f23": lambda: self.send_request("ReplayStopEvents&Channel=1"), "f24": lambda: self.send_request("ReplayLiveToggle"), } self.double_tap_commands = { "f19": "ReplaySelectedEventSingleCameraOn&Value=1", "f20": "ReplaySelectedEventSingleCameraOn&Value=2", "f21": "ReplaySelectedEventSingleCameraOn&Value=3", "f16": "ReplaySelectedEventSingleCameraOn&Value=4", } self.logger.info("Replay commands updated.") def _handle_jog_movement(self, key_name, current_time): if current_time - self.last_jog_time > 0.5: self.jog_rotation_count = 0 self.jog_direction = None if self.jog_direction != key_name: self.jog_rotation_count = 0 self.jog_direction = key_name sensitivity = self.jog_sensitivity_var.get() self.jog_rotation_count = min(self.jog_rotation_count + 1, len(self.jog_speed_levels) - 1) self.last_jog_time = current_time jog_speed = self.jog_speed_levels[self.jog_rotation_count] direction = -1 if key_name == 'f8' else 1 jog_speed *= sensitivity command = f"ReplayJumpFrames&Value={direction * jog_speed}&Channel=1" self.send_request(command) self.logger.debug(f"Jog movement: direction={direction}, speed={jog_speed}, rotations={self.jog_rotation_count}, sensitivity={sensitivity}") # ----- Overlay toggle ----- def toggle_overlay(self): if self.overlay_toggle: self._on_overlay_close() else: self._create_new_overlay_window() self.overlay_toggle = not self.overlay_toggle # ----- Request handler setup ----- def _setup_request_handler(self): try: ip_address = self.ip_entry.get().strip() if not ip_address: ip_address = "127.0.0.1:8088" self.ip_entry.insert(0, ip_address) base_url = f"http://{ip_address}" self.request_handler = SyncRequestHandler(base_url) self.logger.info(f"Request handler initialized: {base_url}") except Exception as e: self.logger.error(f"Request handler setup failed: {e}") self._show_error(f"vMix 연결 설정 실패: {e}") # ----- Send vMix request ----- def send_request(self, command): try: if not self.request_handler: self._setup_request_handler() if not self.vmix_connected: self.logger.warning("vMix not connected. Attempting reconnection...") self._check_vmix_connection() if not self.vmix_connected: self._show_error("vMix가 연결되어 있지 않습니다.") return self.request_handler.send_request(command) except Exception as e: self.logger.error(f"Error sending request: {e}") self._show_error(f"명령 전송 실패: {e}") # ----- Display error message ----- def _show_error(self, message): self.root.after(0, lambda: messagebox.showerror("오류", message)) # ----- Check IP address ----- def _check_ip(self, ip, label=None): def ping_host(): try: host, port = ip.split(':') with socket.create_connection((host, int(port)), timeout=2): if label: self.root.after(0, lambda: label.config(fg="green")) except: if label: self.root.after(0, lambda: label.config(fg="red")) Thread(target=ping_host, daemon=True).start() # ----- Update TAG sheet activation ----- def update_sheet_activation(self, *args): if not hasattr(self, 'notebook'): return try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i in range(3): if i < count: icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook.tab(i, state="normal", image=icon) else: self.notebook.tab(i, state="normal") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="normal") else: icon = self.locked_image if self.locked_image else None if icon: self.notebook.tab(i, state="disabled", image=icon) else: self.notebook.tab(i, state="disabled") for widget in self.sheets[i][0].winfo_children(): if isinstance(widget, tk.Entry): widget.config(state="disabled") # ----- Rename tab on double-click ----- def _rename_tab(self, event): index = self.notebook.index("@%d,%d" % (event.x, event.y)) if index != "end": tab_text = self.notebook.tab(index, "text") entry = tk.Entry(self.notebook, width=20, bg=self.entry_bg_color, fg=self.fg_color, insertbackground=self.fg_color) def set_name(event=None): new_name = entry.get() self.notebook.tab(index, text=new_name) entry.destroy() entry.bind("<Return>", set_name) entry.bind("<FocusOut>", set_name) entry.bind("<Button-1>", lambda e: e.widget.focus_set()) entry.place(x=event.x, y=event.y, anchor="center", width=200, height=30) entry.insert(0, tab_text) entry.select_range(0, tk.END) entry.focus_set() def check_focus(e): if e.widget != entry: set_name() self.root.bind("<Button-1>", check_focus, add="+") def remove_check_focus(event=None): self.root.unbind("<Button-1>", funcid) funcid = self.root.bind("<FocusOut>", remove_check_focus, add="+") # ----- Create new overlay window ----- def _create_new_overlay_window(self): self._update_data_from_entries() self.selected_tags = [] width = 1100 height = 300 self.overlay_window = tk.Toplevel(self.root) self.overlay_window.title("이벤트 태그 (TAG) 선택") self.overlay_window.attributes("-topmost", True) self.overlay_window.configure(bg=self.bg_color) if hasattr(self, 'icon_path') and os.path.exists(self.icon_path): self.overlay_window.iconbitmap(self.icon_path) monitors = screeninfo.get_monitors() try: monitor_index = max(0, min(int(self.overlay_monitor_var.get()) - 1, len(monitors) - 1)) except ValueError: monitor_index = 0 if monitor_index < 0 or monitor_index >= len(monitors): self.logger.error(f"Invalid monitor index: {monitor_index}") self.overlay_window.destroy() return monitor = monitors[monitor_index] overlay_x = monitor.x + (monitor.width - width) // 2 overlay_y = monitor.y + monitor.height - height - 50 self.logger.info(f"Overlay window - Monitor {monitor_index + 1}: (x={overlay_x}, y={overlay_y}), size=({width}x{height})") self.overlay_window.geometry(f"{width}x{height}+{overlay_x}+{overlay_y}") self.overlay_window.resizable(False, False) self.notebook_clone = ttk.Notebook(self.overlay_window) self.notebook_clone.pack(expand=True, fill="both", padx=10, pady=10) try: count = int(self.tag_sheet_count_var.get()) except ValueError: count = 1 for i, (frame, data) in enumerate(self.sheets[:count]): frame_clone = ttk.Frame(self.notebook_clone) icon = self.unlocked_image if self.unlocked_image else None if icon: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text"), image=icon, compound="left") else: self.notebook_clone.add(frame_clone, text=self.notebook.tab(i, "text")) for j in range(10): frame_clone.grid_columnconfigure(j, weight=1) frame_clone.grid_rowconfigure(j, weight=1) for row in range(1, 10): label_row = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_row.grid(row=row, column=0, sticky="nsew") label_col = tk.Label(frame_clone, text=str(row), font=('Arial', 10, 'bold'), borderwidth=1, relief="flat", bg=self.accent_color, fg=self.fg_color) label_col.grid(row=0, column=row, sticky="nsew") for row in range(1, 10): for col in range(1, 10): value = data.at[row, col] bg_color = self.entry_bg_color if (row + col) % 2 == 0 else self.bg_color tk.Label(frame_clone, text=value, font=('Arial', 10), bg=bg_color, fg=self.fg_color, borderwidth=1, relief="solid").grid( row=row, column=col, sticky="nsew", padx=1, pady=1) self.overlay_label = tk.Label(self.overlay_window, text=self.overlay_label_text, bg=self.bg_color, fg=self.fg_color, font=('Arial', 12)) self.overlay_label.pack(side="bottom", padx=10, pady=10, fill="x") self.overlay_window.protocol("WM_DELETE_WINDOW", self._on_overlay_close) self.overlay_window_open = True self.current_sheet = 0 # ----- Overlay window close handling ----- def _on_overlay_close(self): self.overlay_window_open = False self.overlay_close_time = time.time() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.selected_tags: self._send_tag_to_vmix() self.overlay_label_text = "숫자키를 눌러 TAG를 입력하세요" self.current_sheet = 0 self.root.after(100, self._process_buffered_keys) self.overlay_toggle = False def _process_buffered_keys(self): current_time = time.time() for key_name, press_time in self.key_press_buffer: if current_time - press_time < 0.5: self._simulate_key_press(key_name) self.key_press_buffer.clear() def _simulate_key_press(self, key_name): self.on_press(key_name) self.root.after(50, lambda: self.on_release(key_name)) # ----- Update data from entry widgets ----- def _update_data_from_entries(self): for entry in self.entries: try: row = int(entry.grid_info().get('row', 0)) col = int(entry.grid_info().get('column', 0)) for frame, data in self.sheets: if entry.master == frame: data.at[row, col] = entry.get() except Exception as e: self.logger.warning(f"Data update error: {e}") # ----- Save data (as dictionary) ----- def _save_data(self): self._update_data_from_entries() sheet_data = {f"시트 {i+1}": data.to_dict(orient='split') for i, (frame, data) in enumerate(self.sheets)} sheet_names = {f"시트 {i+1}": self.notebook.tab(i, "text") for i in range(self.notebook.index("end"))} return {"sheets": sheet_data, "names": sheet_names} # ----- Save to file ----- def _save_to_file(self): data_dict = self._save_data() file_path = filedialog.asksaveasfilename(defaultextension=".PDB", filetypes=[("PDB files", "*.PDB")]) if file_path: if not file_path.lower().endswith('.pdb'): file_path += '.PDB' try: with open(file_path, 'w', encoding='utf-8') as file: json.dump(data_dict, file, ensure_ascii=False, indent=4) self.logger.info(f"Data saved to {file_path}.") except Exception as e: self.logger.error(f"File save error: {e}") self._show_error(str(e)) # ----- Load from file ----- def _load_from_file(self): file_path = filedialog.askopenfilename(filetypes=[("PDB files", "*.PDB")]) if file_path: try: with open(file_path, 'r', encoding='utf-8') as file: data_dict = json.load(file) sheet_data = data_dict.get("sheets", {}) sheet_names = data_dict.get("names", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: self.notebook.tab(i, text=sheet_names.get(sheet_name, sheet_name)) json_str = json.dumps(sheet_data[sheet_name]) loaded_data = pd.read_json(StringIO(json_str), orient='split') for row in range(1, 10): for col in range(1, 10): value = loaded_data.at[row, col] entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info(f"Data loaded from file: {file_path}") except Exception as e: self.logger.error(f"File load error: {e}") self._show_error(str(e)) else: self.logger.info("File load canceled.") # ----- Clear all data ----- def _clear_all_data(self): if messagebox.askokcancel("초기화 확인", "정말 모든 데이터를 초기화 하시겠습니까?"): for entry in self.entries: entry.config(state="normal") entry.delete(0, tk.END) for frame, data in self.sheets: data.loc[:] = "" self.logger.info("All data cleared.") # ----- Get entry widget at specific grid coordinates ----- def _get_entry(self, frame, row, col): for widget in frame.winfo_children(): if isinstance(widget, tk.Entry): info = widget.grid_info() if int(info.get('row', -1)) == row and int(info.get('column', -1)) == col: return widget return None # ----- Check vMix connection (background thread) ----- def _check_vmix_connection(self): try: ip_address = self.ip_entry.get().strip() response = requests.get(f"http://{ip_address}/api/", timeout=0.5) response.raise_for_status() is_connected = True self.logger.info("vMix connected") except requests.exceptions.RequestException as e: is_connected = False self.logger.error(f"vMix connection error: {e}") if not self.request_handler: self._setup_request_handler() self.root.after(0, lambda: self._update_vmix_status_indicator(is_connected)) # ----- Periodic vMix connection check (every 5 seconds) ----- def _start_vmix_check_loop(self): current_time = time.time() if current_time - self.last_vmix_check >= self.vmix_check_interval: self.last_vmix_check = current_time Thread(target=self._check_vmix_connection, daemon=True).start() self.root.after(5000, self._start_vmix_check_loop) # ----- Update vMix status indicator ----- def _update_vmix_status_indicator(self, connected): color = "green" if connected else "red" if hasattr(self, 'vmix_status_indicator'): self.vmix_status_indicator.delete("status_oval") self.vmix_status_indicator.create_oval(2, 2, 13, 13, fill=color, tags="status_oval") self.vmix_connected = connected # ----- Save settings ----- def _save_settings(self): settings = { 'ip': self.ip_entry.get(), 'camera_count': self.camera_count_var.get(), 'overlay_on_off': self.overlay_on_off_var.get(), 'overlay_monitor': self.overlay_monitor_var.get(), 'tag_sheet_count': self.tag_sheet_count_var.get(), 'jog_sensitivity': self.jog_sensitivity_var.get(), } for i, var in enumerate(self.end_time_vars): settings[f'end_time_{i+1}'] = var.get() sheet_data = {} for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" sheet_data[sheet_name] = {} for row in range(1, 10): for col in range(1, 10): entry = self._get_entry(frame, row, col) if entry: sheet_data[sheet_name][f"{row},{col}"] = entry.get() settings['sheet_data'] = sheet_data try: with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=4) self.logger.info("Settings saved successfully.") except Exception as e: self.logger.error(f"Settings save failed: {e}") self._show_error(f"설정 저장 실패: {e}") # ----- Load settings ----- def _load_settings(self): try: with open('settings.json', 'r', encoding='utf-8') as f: settings = json.load(f) self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, settings.get('ip', '127.0.0.1:8088')) self._check_ip(self.ip_entry.get()) for i, var in enumerate(self.end_time_vars): var.set(settings.get(f'end_time_{i+1}', str((i+1)*10))) self.camera_count_var.set(settings.get('camera_count', '4')) self.overlay_on_off_var.set(settings.get('overlay_on_off', True)) self.overlay_monitor_var.set(settings.get('overlay_monitor', '1')) self.tag_sheet_count_var.set(settings.get('tag_sheet_count', '3')) self.jog_sensitivity_var.set(settings.get('jog_sensitivity', 5)) sheet_data = settings.get("sheet_data", {}) for i, (frame, data) in enumerate(self.sheets): sheet_name = f"시트 {i+1}" if sheet_name in sheet_data: for row in range(1, 10): for col in range(1, 10): value = sheet_data[sheet_name].get(f"{row},{col}", "") entry = self._get_entry(frame, row, col) if entry: entry.config(state="normal") entry.delete(0, tk.END) entry.insert(0, value) self.logger.info("Settings loaded successfully.") except FileNotFoundError: self.logger.warning("Settings file not found. Using defaults.") except json.JSONDecodeError as e: self.logger.error(f"Settings JSON parse error: {e}") self._show_error(f"설정 파일 파싱 오류: {e}") # ----- Program exit handling ----- def _on_closing(self): if hasattr(self, 'keyboard_hook') and self.keyboard_hook: self.keyboard_hook.stop() self._save_settings() if self.overlay_window: self.overlay_window.destroy() self.overlay_window = None if self.request_handler: self.request_handler.close() self.logger.info("Program exiting.") self.root.destroy() # ----- Run main loop ----- def run(self): self.root.protocol("WM_DELETE_WINDOW", self._on_closing) self.root.mainloop() # ------------------------------------------------------------------------- # Additional _get_monitor_count method (returns number of monitors) @functools.lru_cache(maxsize=None) def _get_monitor_count(self): """Return the number of monitors (with caching).""" return len(screeninfo.get_monitors()) # ============================================================================= # Main function # ============================================================================= def main(): controller = ReplayController() controller.run() if __name__ == "__main__": main()

이 코드에는 기존 기능(키보드 후킹, vMix API 요청, GUI 구성, 오버레이 창, TAG 시스템 등)을 모두 유지하면서 중복 호출 문제와 일부 예외 처리 문제를 개선하였습니다. 추가적으로 설정 저장 위치나 코드 모듈화를 고려해보시면 더욱 좋을 것입니다.

이 Q&A 공유하기