""" import signal import threading import queue im...
Created on: December 14, 2025
Answered using GPT-5.2 Thinking by Chat01
Created on: December 14, 2025
Answered using GPT-5.2 Thinking by Chat01
"""
import signal
import threading
import queue
import time
import math
import socket
import struct
import os
import cv2
import numpy as np
from picamera2 import Picamera2
from vittrack import VitTrack
ROI = (315, 222, 415, 322) # (x0, y0, x1, y1)
BLUR_KSIZE = 13
SIGMA = 1.0
USE_AUTO = True
OVERLAY = True
GAP_THRESHOLD = 0.4
TARGET_IP = "192.168.50.1"
PORT = 9999
COORDS_PORT = 9998
TRACKING_QSIZE = 1
COORDS_QSIZE = 32
CAP_FPS = 30.0
MODEL_PATH = "/home/pi/Desktop/vittracker/object_tracking_vittrack_2023sep.onnx"
BACKEND_ID = cv2.dnn.DNN_BACKEND_OPENCV
TARGET_ID = cv2.dnn.DNN_TARGET_CPU
VIT_SCORE_THRESHOLD = 0.3
def to_gray(img):
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if img.ndim == 3 else img
def ensure_odd(x):
x = int(max(1, x))
return x if (x % 2 == 1) else x + 1
def auto_canny_thresholds(gray, sigma=0.33):
v = np.median(gray)
low = int(max(0, (1.0 - sigma) * v))
high = int(min(255, (1.0 + sigma) * v))
return low, high
def edge_sobel(img, ksize=3, normalize=False):
gray = to_gray(img)
gx = cv2.Sobel(gray, cv2.CV_16S, 1, 0, ksize=ksize)
gy = cv2.Sobel(gray, cv2.CV_16S, 0, 1, ksize=ksize)
mag = cv2.magnitude(gx.astype(np.float32), gy.astype(np.float32))
if normalize:
return cv2.normalize(np.abs(mag), None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
return cv2.convertScaleAbs(mag)
def edge_canny_robust(img, blur_ksize=5, use_auto=True, sigma=0.33, min_blur=1, debug=False):
gray = to_gray(img)
if gray is None:
return np.zeros((img.shape[0], img.shape[1]), dtype=np.uint8)
textif gray.std() < 3.5: if debug: print("[Canny] low-texture skip", gray.std()) return np.zeros(gray.shape, dtype=np.uint8) blur_ksize = ensure_odd(blur_ksize) sigma = float(sigma) tries = [] tries.append((blur_ksize, sigma)) b = blur_ksize - 2 s = max(0.05, sigma * 0.9) while b >= min_blur: tries.append((ensure_odd(max(1, b)), max(0.01, s))) b -= 2 s = max(0.01, s * 0.85) tries.append((1, max(0.01, sigma * 0.5))) tries.append((1, 0.01)) last_edges = None for (bk, sg) in tries: if bk > 1: g = cv2.GaussianBlur(gray, (bk, bk), 0) else: g = gray if use_auto: l_, h_ = auto_canny_thresholds(g, sg) else: l_, h_ = 50, 150 if l_ >= h_: h_ = min(255, l_ + 10) edges = cv2.Canny(g, l_, h_) edges = cv2.convertScaleAbs(edges) cnt = int(np.count_nonzero(edges)) if debug: print(f"[Canny try] blur={bk} sigma={sg:.3f} edges={cnt}") if cnt > 0: if bk <= 3 and cnt < 50: edges = cv2.dilate(edges, cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1)), iterations=1) return (edges > 0).astype(np.uint8) * 255 last_edges = edges sob = edge_sobel(img, ksize=3, normalize=True) sob = cv2.convertScaleAbs(sob) _, sob_bin = cv2.threshold(sob, max(8, int(sob.mean() * 0.5)), 255, cv2.THRESH_BINARY) if np.count_nonzero(sob_bin) > 0: return sob_bin return (last_edges > 0).astype(np.uint8) * 255
def find_split_boxes_by_gap(edges_bin, gap_threshold=0.4):
if np.count_nonzero(edges_bin) == 0:
return []
ys, xs = np.where(edges_bin > 0)
if len(xs) == 0:
return []
minx, maxx = xs.min(), xs.max()
total_width = maxx - minx if (maxx - minx) > 0 else 1
gap_threshold_px = total_width * gap_threshold
xs_sorted = np.sort(np.unique(xs))
clusters = []
cur_s = xs_sorted[0]; cur_e = xs_sorted[0]
for i in range(1, len(xs_sorted)):
x_curr = xs_sorted[i]; x_prev = xs_sorted[i-1]
gap = x_curr - x_prev
if gap > gap_threshold_px:
clusters.append((cur_s, cur_e))
cur_s = x_curr; cur_e = x_curr
else:
cur_e = x_curr
clusters.append((cur_s, cur_e))
boxes = []
for x_start, x_end in clusters:
mask_x = (xs >= x_start) & (xs <= x_end)
ys_in_range = ys[mask_x]
if len(ys_in_range) > 0:
y_start = ys_in_range.min(); y_end = ys_in_range.max()
boxes.append((x_start, y_start, x_end, y_end))
return boxes
def overlay_edges(original_bgr, edges_gray, alpha=0.75, edge_color=(0, 255, 0)):
base = original_bgr.copy()
if len(edges_gray.shape) == 3:
edges_gray = cv2.cvtColor(edges_gray, cv2.COLOR_BGR2GRAY)
edges_gray = cv2.convertScaleAbs(edges_gray)
_, bin_mask = cv2.threshold(edges_gray, 10, 255, cv2.THRESH_BINARY)
if bin_mask.sum() == 0:
return base
mask = bin_mask.astype(bool)
overlay = base.copy()
overlay[mask] = edge_color
blended = cv2.addWeighted(overlay, alpha, base, 1-alpha, 0)
out = base.copy()
out[mask] = blended[mask]
return out
class LowLatencyEdgeTracker:
def init(self, roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID):
self.roi = roi
self.picam2 = Picamera2()
config = self.picam2.create_preview_configuration(main={"size": (730, 544)})
self.picam2.configure(config)
self.picam2.start()
text# Shared latest frame (fast path) self.latest_frame = None self.latest_frame_lock = threading.Lock() # Queue to send a single-copy to the background tracker (overwrite mode) self.tracking_q = queue.Queue(maxsize=TRACKING_QSIZE) # Queue for small coordinate messages to be sent over network self.coords_q = queue.Queue(maxsize=COORDS_QSIZE) # Shared overlay state produced by tracker (bbox + last edges) self.overlay_lock = threading.Lock() self.last_bbox = None # (x, y, w, h) self.last_edges_full = None # full-frame edges (gray) for optional overlay # control self.running = threading.Event() self.running.set() # ViT model config self.model_path = model_path self.backend_id = backend_id self.target_id = target_id # Create a single VitTrack instance at startup (heavy op). If it fails, we'll attempt on-demand once. self.vit = None try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created at startup.") else: print("VitTrack model file not found at:", self.model_path) except Exception as ex: self.vit = None print("Failed to create VitTrack instance at startup:", ex) # tracker active pointer & flag (the active tracker used for .update/.infer) self.tracker = None self.tracking = False # threads self.threads = [] self.threads.append(threading.Thread(target=self.capture_loop, daemon=True)) self.threads.append(threading.Thread(target=self.slow_processing_loop, daemon=True)) self.threads.append(threading.Thread(target=self.local_display_loop, daemon=True)) self.threads.append(threading.Thread(target=self.streaming_loop, daemon=True)) self.threads.append(threading.Thread(target=self.coords_streaming_loop, daemon=True)) def start(self): for t in self.threads: t.start() try: while self.running.is_set(): time.sleep(0.2) except KeyboardInterrupt: self.stop() self._cleanup() def stop(self): self.running.clear() def _cleanup(self): try: self.picam2.stop() except Exception: pass cv2.destroyAllWindows() # -------------------- Fast path: capture -> update latest_frame & push for tracker (overwrite) ---------- def capture_loop(self): fps_sleep = max(0.001, 1.0 / CAP_FPS) while self.running.is_set(): frame = self.picam2.capture_array() if frame is None: time.sleep(0.01) continue # convert BGRA -> BGR if needed if frame.ndim == 3 and frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # Update shared latest frame (fast path) with self.latest_frame_lock: self.latest_frame = frame # Send a copy to slow tracker path in overwrite mode try: self.tracking_q.get_nowait() except queue.Empty: pass try: tracking_frame = frame.copy() self.tracking_q.put_nowait(tracking_frame) except queue.Full: pass time.sleep(fps_sleep) # -------------------- Slow path: background edge detection + ViT tracker ---------------------------- def slow_processing_loop(self): frame_count = 0 roi_center = ((self.roi[0] + self.roi[2]) / 2.0, (self.roi[1] + self.roi[3]) / 2.0) while self.running.is_set(): try: frame = self.tracking_q.get(timeout=0.2) #This queue is fed by the capture thread,If nothing arrives → loop continues except queue.Empty: continue frame_count += 1 h, w = frame.shape[:2] x0, y0, x1, y1 = self.roi rx0 = max(0, min(w-1, x0)); ry0 = max(0, y0) #Clip ROI safely,Prevents crashes if ROI is outside frame,Ensures slicing always valid rx1 = max(0, min(w-1, x1)); ry1 = max(0, min(h-1, y1)) if rx1 <= rx0 or ry1 <= ry0: continue # compute edges on ROI roi_bgr = frame[ry0:ry1, rx0:rx1] edges_roi = edge_canny_robust(roi_bgr, blur_ksize=BLUR_KSIZE, use_auto=USE_AUTO, sigma=SIGMA) edges_roi = cv2.convertScaleAbs(edges_roi) _, edges_bin_roi = cv2.threshold(edges_roi, 10, 255, cv2.THRESH_BINARY) # create full-size edges for overlay (if requested) edges_full = np.zeros((h, w), dtype=np.uint8) edges_full[ry0:ry1, rx0:rx1] = edges_bin_roi # find split boxes within ROI split_boxes_roi = find_split_boxes_by_gap(edges_bin_roi, gap_threshold=GAP_THRESHOLD) closest_box = None min_distance = float('inf') for (roi_x0, roi_y0, roi_x1, roi_y1) in split_boxes_roi: bb_x0 = rx0 + roi_x0; bb_y0 = ry0 + roi_y0 bb_x1 = rx0 + roi_x1; bb_y1 = ry0 + roi_y1 cx = (bb_x0 + bb_x1) / 2.0; cy = (bb_y0 + bb_y1) / 2.0 dist = math.hypot(cx - roi_center[0], cy - roi_center[1]) #Chooses closest object to ROI center if dist < min_distance: min_distance = dist closest_box = (bb_x0, bb_y0, bb_x1, bb_y1) # --- Tracker init if we do not have a tracker but a candidate box exists --- if not self.tracking and closest_box is not None: # If self.vit was not created at startup, try to create it once here (on-demand) if self.vit is None: try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created on-demand.") else: print("VitTrack model file missing at:", self.model_path) self.vit = None except Exception as ex: print("ViT Tracker init exception (create on-demand):", ex) self.vit = None if self.vit is not None: try: bx0, by0, bx1, by1 = closest_box bw = bx1 - bx0; bh = by1 - by0 # Use the single vit instance: call init to start tracking self.vit.init(frame, (int(bx0), int(by0), int(bw), int(bh))) self.tracker = self.vit self.tracking = True with self.overlay_lock: self.last_bbox = (int(bx0), int(by0), int(bw), int(bh)) self.last_edges_full = edges_full.copy() print("ViT Tracker started:", self.last_bbox) except Exception as ex: print("ViT Tracker init exception (init):", ex) self.tracker = None self.tracking = False # --- Tracker update if active --- if self.tracking and self.tracker is not None: try: is_located, bbox, score = self.tracker.infer(frame) except Exception as ex: is_located = False bbox = None score = 0.0 print("ViT infer exception:", ex) if is_located and bbox is not None and (score is None or score >= VIT_SCORE_THRESHOLD): x, y, wbox, hbox = [int(v) for v in bbox] # update overlay state with self.overlay_lock: self.last_bbox = (x, y, wbox, hbox) self.last_edges_full = edges_full.copy() # send coordinates as small messages (don't block the slow thread) try: self.coords_q.put_nowait((frame_count, int(x + wbox/2), int(y + hbox/2))) except queue.Full: pass else: # lost tracking: clear state but keep trying to detect new candidates with self.overlay_lock: self.last_bbox = None self.last_edges_full = edges_full.copy() # clear active tracker pointer (but keep self.vit for reuse) self.tracker = None self.tracking = False else: # not tracking: update only last_edges_full for overlay with self.overlay_lock: self.last_edges_full = edges_full.copy() # -------------------- Local display (fast) ----------- def local_display_loop(self): while self.running.is_set(): frame = None with self.latest_frame_lock: if self.latest_frame is not None: frame = self.latest_frame.copy() if frame is None: time.sleep(0.01) continue # overlay edges and box if available (but don't block) with self.overlay_lock: bbox = self.last_bbox edges_full = None if not OVERLAY else (self.last_edges_full.copy() if self.last_edges_full is not None else None) if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full, alpha=0.6, edge_color=(0,255,0)) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x, y), (x + wbox, y + hbox), (0,255,255), 2) cv2.putText(frame, "Tracking", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 2) # draw ROI for debugging x0, y0, x1, y1 = self.roi cv2.rectangle(frame, (x0, y0), (x1, y1), (0,128,255), 1) cv2.imshow("Low-latency Live", frame) # poll for close (Esc) if cv2.waitKey(1) == 27: self.stop() break time.sleep(0.001) # -------------------- Streaming loop -------------------- def streaming_loop(self): conn = None sock = None while self.running.is_set(): if conn is None: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: sock.connect((TARGET_IP, PORT)) conn = sock conn.settimeout(None) print("[Streaming] Connected to", (TARGET_IP, PORT)) except socket.error as e: print("[Streaming] Connect error:", e) try: sock.close() except Exception: pass conn = None sock = None time.sleep(1.0) continue frame = None with self.latest_frame_lock: if self.latest_frame is not None: frame = self.latest_frame.copy() if frame is None: time.sleep(0.01) continue with self.overlay_lock: bbox = self.last_bbox edges_full = None if not OVERLAY else (self.last_edges_full.copy() if self.last_edges_full is not None else None) if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full, alpha=0.6, edge_color=(0,255,0)) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x, y), (x + wbox, y + hbox), (0,255,255), 2) try: result, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) if not result: time.sleep(0.005) continue data = buffer.tobytes() size = len(data) conn.sendall(struct.pack("<L", size)) conn.sendall(data) except (socket.error, struct.error) as e: print("[Streaming] Send error, reconnecting:", e) try: conn.close() except Exception: pass conn = None sock = None time.sleep(0.5) # -------------------- Coords streaming -------------------- def coords_streaming_loop(self): conn = None sock = None while self.running.is_set(): if conn is None: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: sock.connect((TARGET_IP, COORDS_PORT)) conn = sock conn.settimeout(None) print("[Coords] Connected to", (TARGET_IP, COORDS_PORT)) except socket.error as e: print("[Coords] Connect error:", e) try: sock.close() except Exception: pass conn = None sock = None time.sleep(1.0) continue try: frame_num, cx, cy = self.coords_q.get(timeout=0.1) except queue.Empty: continue try: coords_str = f"{frame_num},{cx},{cy}\n" conn.sendall(coords_str.encode('utf-8')) except socket.error as e: print("[Coords] Send error:", e) try: conn.close() except Exception: pass conn = None sock = None
def main():
app = LowLatencyEdgeTracker(roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID)
textdef _sigint_handler(signum, frame): print("SIGINT received — stopping app") app.stop() signal.signal(signal.SIGINT, _sigint_handler) print("Starting low-latency edge tracker (ViT). Press ESC in the window to stop.") app.start() print("Stopped.")
if name == "main":
main()
rewrite this code, that video be readed from webcam camera not picamera
give me full edited code
pythonimport signal import threading import queue import time import math import socket import struct import os import cv2 import numpy as np from vittrack import VitTrack # -------------------- CONFIG -------------------- ROI = (315, 222, 415, 322) # (x0, y0, x1, y1) BLUR_KSIZE = 13 SIGMA = 1.0 USE_AUTO = True OVERLAY = True GAP_THRESHOLD = 0.4 TARGET_IP = "192.168.50.1" PORT = 9999 COORDS_PORT = 9998 TRACKING_QSIZE = 1 COORDS_QSIZE = 32 CAP_FPS = 30.0 # Webcam settings CAM_INDEX = 0 # 0 is usually the default webcam CAPTURE_BACKEND = cv2.CAP_V4L2 # Linux: V4L2 (change/remove if needed) FRAME_SIZE = (730, 544) # (width, height) try to match your old Picamera2 size FORCE_MJPG = True # often reduces latency on webcams # ViT model settings — change to your model location MODEL_PATH = "/home/pi/Desktop/vittracker/object_tracking_vittrack_2023sep.onnx" # Choose backend/target for cv.dnn: adjust if you want CUDA / NPU BACKEND_ID = cv2.dnn.DNN_BACKEND_OPENCV TARGET_ID = cv2.dnn.DNN_TARGET_CPU # Minimum score threshold to accept ViT detection as valid tracking VIT_SCORE_THRESHOLD = 0.3 # -------------------- helpers -------------------- def to_gray(img): return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if img.ndim == 3 else img def ensure_odd(x): x = int(max(1, x)) return x if (x % 2 == 1) else x + 1 def auto_canny_thresholds(gray, sigma=0.33): v = np.median(gray) low = int(max(0, (1.0 - sigma) * v)) high = int(min(255, (1.0 + sigma) * v)) return low, high def edge_sobel(img, ksize=3, normalize=False): gray = to_gray(img) gx = cv2.Sobel(gray, cv2.CV_16S, 1, 0, ksize=ksize) gy = cv2.Sobel(gray, cv2.CV_16S, 0, 1, ksize=ksize) mag = cv2.magnitude(gx.astype(np.float32), gy.astype(np.float32)) if normalize: return cv2.normalize(np.abs(mag), None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) return cv2.convertScaleAbs(mag) def edge_canny_robust(img, blur_ksize=5, use_auto=True, sigma=0.33, min_blur=1, debug=False): gray = to_gray(img) if gray is None: return np.zeros((img.shape[0], img.shape[1]), dtype=np.uint8) if gray.std() < 3.5: if debug: print("[Canny] low-texture skip", gray.std()) return np.zeros(gray.shape, dtype=np.uint8) blur_ksize = ensure_odd(blur_ksize) sigma = float(sigma) tries = [] tries.append((blur_ksize, sigma)) b = blur_ksize - 2 s = max(0.05, sigma * 0.9) while b >= min_blur: tries.append((ensure_odd(max(1, b)), max(0.01, s))) b -= 2 s = max(0.01, s * 0.85) tries.append((1, max(0.01, sigma * 0.5))) tries.append((1, 0.01)) last_edges = None for (bk, sg) in tries: if bk > 1: g = cv2.GaussianBlur(gray, (bk, bk), 0) else: g = gray if use_auto: l_, h_ = auto_canny_thresholds(g, sg) else: l_, h_ = 50, 150 if l_ >= h_: h_ = min(255, l_ + 10) edges = cv2.Canny(g, l_, h_) edges = cv2.convertScaleAbs(edges) cnt = int(np.count_nonzero(edges)) if debug: print(f"[Canny try] blur={bk} sigma={sg:.3f} edges={cnt}") if cnt > 0: if bk <= 3 and cnt < 50: edges = cv2.dilate(edges, cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1)), iterations=1) return (edges > 0).astype(np.uint8) * 255 last_edges = edges sob = edge_sobel(img, ksize=3, normalize=True) sob = cv2.convertScaleAbs(sob) _, sob_bin = cv2.threshold(sob, max(8, int(sob.mean() * 0.5)), 255, cv2.THRESH_BINARY) if np.count_nonzero(sob_bin) > 0: return sob_bin return (last_edges > 0).astype(np.uint8) * 255 def find_split_boxes_by_gap(edges_bin, gap_threshold=0.4): if np.count_nonzero(edges_bin) == 0: return [] ys, xs = np.where(edges_bin > 0) if len(xs) == 0: return [] minx, maxx = xs.min(), xs.max() total_width = maxx - minx if (maxx - minx) > 0 else 1 gap_threshold_px = total_width * gap_threshold xs_sorted = np.sort(np.unique(xs)) clusters = [] cur_s = xs_sorted[0]; cur_e = xs_sorted[0] for i in range(1, len(xs_sorted)): x_curr = xs_sorted[i]; x_prev = xs_sorted[i-1] gap = x_curr - x_prev if gap > gap_threshold_px: clusters.append((cur_s, cur_e)) cur_s = x_curr; cur_e = x_curr else: cur_e = x_curr clusters.append((cur_s, cur_e)) boxes = [] for x_start, x_end in clusters: mask_x = (xs >= x_start) & (xs <= x_end) ys_in_range = ys[mask_x] if len(ys_in_range) > 0: y_start = ys_in_range.min(); y_end = ys_in_range.max() boxes.append((x_start, y_start, x_end, y_end)) return boxes def overlay_edges(original_bgr, edges_gray, alpha=0.75, edge_color=(0, 255, 0)): base = original_bgr.copy() if len(edges_gray.shape) == 3: edges_gray = cv2.cvtColor(edges_gray, cv2.COLOR_BGR2GRAY) edges_gray = cv2.convertScaleAbs(edges_gray) _, bin_mask = cv2.threshold(edges_gray, 10, 255, cv2.THRESH_BINARY) if bin_mask.sum() == 0: return base mask = bin_mask.astype(bool) overlay = base.copy() overlay[mask] = edge_color blended = cv2.addWeighted(overlay, alpha, base, 1 - alpha, 0) out = base.copy() out[mask] = blended[mask] return out # -------------------- Low-latency app -------------------- class LowLatencyEdgeTracker: def __init__(self, roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID): self.roi = roi # ---- Webcam init (replaces Picamera2) ---- self.cap = cv2.VideoCapture(CAM_INDEX, CAPTURE_BACKEND) if not self.cap.isOpened(): raise RuntimeError(f"Failed to open webcam (index={CAM_INDEX}).") # Reduce latency when supported try: self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) except Exception: pass if FORCE_MJPG: try: self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) except Exception: pass # Try to match size/fps (not all webcams honor these) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_SIZE[0]) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_SIZE[1]) self.cap.set(cv2.CAP_PROP_FPS, CAP_FPS) # Shared latest frame (fast path) self.latest_frame = None self.latest_frame_lock = threading.Lock() # Queue to send a single-copy to the background tracker (overwrite mode) self.tracking_q = queue.Queue(maxsize=TRACKING_QSIZE) # Queue for small coordinate messages to be sent over network self.coords_q = queue.Queue(maxsize=COORDS_QSIZE) # Shared overlay state produced by tracker (bbox + last edges) self.overlay_lock = threading.Lock() self.last_bbox = None # (x, y, w, h) self.last_edges_full = None # full-frame edges (gray) for optional overlay # control self.running = threading.Event() self.running.set() # ViT model config self.model_path = model_path self.backend_id = backend_id self.target_id = target_id # Create a single VitTrack instance at startup (heavy op). If it fails, we'll attempt on-demand once. self.vit = None try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created at startup.") else: print("VitTrack model file not found at:", self.model_path) except Exception as ex: self.vit = None print("Failed to create VitTrack instance at startup:", ex) # tracker active pointer & flag (the active tracker used for .update/.infer) self.tracker = None self.tracking = False # threads self.threads = [] self.threads.append(threading.Thread(target=self.capture_loop, daemon=True)) self.threads.append(threading.Thread(target=self.slow_processing_loop, daemon=True)) self.threads.append(threading.Thread(target=self.local_display_loop, daemon=True)) self.threads.append(threading.Thread(target=self.streaming_loop, daemon=True)) self.threads.append(threading.Thread(target=self.coords_streaming_loop, daemon=True)) def start(self): for t in self.threads: t.start() try: while self.running.is_set(): time.sleep(0.2) except KeyboardInterrupt: self.stop() self._cleanup() def stop(self): self.running.clear() def _cleanup(self): try: if self.cap is not None: self.cap.release() except Exception: pass cv2.destroyAllWindows() # -------------------- Fast path: capture -> update latest_frame & push for tracker (overwrite) ---------- def capture_loop(self): fps_sleep = max(0.0001, 1.0 / max(1.0, CAP_FPS)) while self.running.is_set(): ok, frame = self.cap.read() if not ok or frame is None: time.sleep(0.01) continue # Update shared latest frame (fast path) with self.latest_frame_lock: self.latest_frame = frame # Send a copy to slow tracker path in overwrite mode try: self.tracking_q.get_nowait() except queue.Empty: pass try: self.tracking_q.put_nowait(frame.copy()) except queue.Full: pass # If your camera runs at CAP_FPS already, you can set this to 0 time.sleep(fps_sleep) # -------------------- Slow path: background edge detection + ViT tracker ---------------------------- def slow_processing_loop(self): frame_count = 0 roi_center = ((self.roi[0] + self.roi[2]) / 2.0, (self.roi[1] + self.roi[3]) / 2.0) while self.running.is_set(): try: frame = self.tracking_q.get(timeout=0.2) except queue.Empty: continue frame_count += 1 h, w = frame.shape[:2] x0, y0, x1, y1 = self.roi rx0 = max(0, min(w - 1, x0)) ry0 = max(0, min(h - 1, y0)) rx1 = max(0, min(w - 1, x1)) ry1 = max(0, min(h - 1, y1)) if rx1 <= rx0 or ry1 <= ry0: continue # compute edges on ROI roi_bgr = frame[ry0:ry1, rx0:rx1] edges_roi = edge_canny_robust( roi_bgr, blur_ksize=BLUR_KSIZE, use_auto=USE_AUTO, sigma=SIGMA ) edges_roi = cv2.convertScaleAbs(edges_roi) _, edges_bin_roi = cv2.threshold(edges_roi, 10, 255, cv2.THRESH_BINARY) # create full-size edges for overlay (if requested) edges_full = np.zeros((h, w), dtype=np.uint8) edges_full[ry0:ry1, rx0:rx1] = edges_bin_roi # find split boxes within ROI split_boxes_roi = find_split_boxes_by_gap(edges_bin_roi, gap_threshold=GAP_THRESHOLD) closest_box = None min_distance = float("inf") for (roi_x0, roi_y0, roi_x1, roi_y1) in split_boxes_roi: bb_x0 = rx0 + roi_x0; bb_y0 = ry0 + roi_y0 bb_x1 = rx0 + roi_x1; bb_y1 = ry0 + roi_y1 cx = (bb_x0 + bb_x1) / 2.0 cy = (bb_y0 + bb_y1) / 2.0 dist = math.hypot(cx - roi_center[0], cy - roi_center[1]) if dist < min_distance: min_distance = dist closest_box = (bb_x0, bb_y0, bb_x1, bb_y1) # --- Tracker init if we do not have a tracker but a candidate box exists --- if not self.tracking and closest_box is not None: # If self.vit was not created at startup, try to create it once here (on-demand) if self.vit is None: try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created on-demand.") else: print("VitTrack model file missing at:", self.model_path) self.vit = None except Exception as ex: print("ViT Tracker init exception (create on-demand):", ex) self.vit = None if self.vit is not None: try: bx0, by0, bx1, by1 = closest_box bw = bx1 - bx0 bh = by1 - by0 self.vit.init(frame, (int(bx0), int(by0), int(bw), int(bh))) self.tracker = self.vit self.tracking = True with self.overlay_lock: self.last_bbox = (int(bx0), int(by0), int(bw), int(bh)) self.last_edges_full = edges_full.copy() print("ViT Tracker started:", self.last_bbox) except Exception as ex: print("ViT Tracker init exception (init):", ex) self.tracker = None self.tracking = False # --- Tracker update if active --- if self.tracking and self.tracker is not None: try: is_located, bbox, score = self.tracker.infer(frame) except Exception as ex: is_located = False bbox = None score = 0.0 print("ViT infer exception:", ex) if is_located and bbox is not None and (score is None or score >= VIT_SCORE_THRESHOLD): x, y, wbox, hbox = [int(v) for v in bbox] with self.overlay_lock: self.last_bbox = (x, y, wbox, hbox) self.last_edges_full = edges_full.copy() try: self.coords_q.put_nowait((frame_count, int(x + wbox / 2), int(y + hbox / 2))) except queue.Full: pass else: with self.overlay_lock: self.last_bbox = None self.last_edges_full = edges_full.copy() self.tracker = None self.tracking = False else: with self.overlay_lock: self.last_edges_full = edges_full.copy() # -------------------- Local display (fast) ----------- def local_display_loop(self): while self.running.is_set(): frame = None with self.latest_frame_lock: if self.latest_frame is not None: frame = self.latest_frame.copy() if frame is None: time.sleep(0.01) continue with self.overlay_lock: bbox = self.last_bbox edges_full = None if not OVERLAY else (self.last_edges_full.copy() if self.last_edges_full is not None else None) if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full, alpha=0.6, edge_color=(0, 255, 0)) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x, y), (x + wbox, y + hbox), (0, 255, 255), 2) cv2.putText(frame, "Tracking", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # draw ROI for debugging x0, y0, x1, y1 = self.roi cv2.rectangle(frame, (x0, y0), (x1, y1), (0, 128, 255), 1) cv2.imshow("Low-latency Live", frame) if cv2.waitKey(1) == 27: # ESC self.stop() break time.sleep(0.001) # -------------------- Streaming loop -------------------- def streaming_loop(self): conn = None sock = None while self.running.is_set(): if conn is None: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: sock.connect((TARGET_IP, PORT)) conn = sock conn.settimeout(None) print("[Streaming] Connected to", (TARGET_IP, PORT)) except socket.error as e: print("[Streaming] Connect error:", e) try: sock.close() except Exception: pass conn = None sock = None time.sleep(1.0) continue frame = None with self.latest_frame_lock: if self.latest_frame is not None: frame = self.latest_frame.copy() if frame is None: time.sleep(0.01) continue with self.overlay_lock: bbox = self.last_bbox edges_full = None if not OVERLAY else (self.last_edges_full.copy() if self.last_edges_full is not None else None) if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full, alpha=0.6, edge_color=(0, 255, 0)) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x, y), (x + wbox, y + hbox), (0, 255, 255), 2) try: result, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) if not result: time.sleep(0.005) continue data = buffer.tobytes() size = len(data) conn.sendall(struct.pack("<L", size)) conn.sendall(data) except (socket.error, struct.error) as e: print("[Streaming] Send error, reconnecting:", e) try: conn.close() except Exception: pass conn = None sock = None time.sleep(0.5) # -------------------- Coords streaming -------------------- def coords_streaming_loop(self): conn = None sock = None while self.running.is_set(): if conn is None: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: sock.connect((TARGET_IP, COORDS_PORT)) conn = sock conn.settimeout(None) print("[Coords] Connected to", (TARGET_IP, COORDS_PORT)) except socket.error as e: print("[Coords] Connect error:", e) try: sock.close() except Exception: pass conn = None sock = None time.sleep(1.0) continue try: frame_num, cx, cy = self.coords_q.get(timeout=0.1) except queue.Empty: continue try: coords_str = f"{frame_num},{cx},{cy}\n" conn.sendall(coords_str.encode("utf-8")) except socket.error as e: print("[Coords] Send error:", e) try: conn.close() except Exception: pass conn = None sock = None # -------------------- main -------------------- def main(): app = LowLatencyEdgeTracker(roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID) def _sigint_handler(signum, frame): print("SIGINT received — stopping app") app.stop() signal.signal(signal.SIGINT, _sigint_handler) print("Starting low-latency edge tracker (ViT) using webcam. Press ESC in the window to stop.") app.start() print("Stopped.") if __name__ == "__main__": main()