# 檔名: wafer_wafer_640x480_mp4_queue_arch_fixed_v3....
Создано: 22 сентября 2025 г.
Создано: 22 сентября 2025 г.
import cv2
import numpy as np
import cvzone
import time
import os
import sys
import shutil
import platform
import datetime
import threading
import socket
from queue import Queue, Empty, Full
import traceback # Import for detailed error logging
from ultralytics import YOLO
from MBTS import MBTS
import wafer_seg_640x480_video
YOLO_PIN_MODEL_PATH = 'best_slotIR_640x480.pt'
YOLO_SEG_MODEL_PATH = 'best_result_640x480.pt'
CLASSES_FILE = 'classes1.txt'
SAVE_DIR_TRACKED = "RED_circuit"
SAVE_DIR_GREEN = "Green_square"
RECORDS_DIR = "Records"
CAMERA_INDEX = 0
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
FPS = 60.0
EXPOSURE = -7
BRIGHTNESS = 40.0
CONTRAST = 32.0
TCP_PORT = 5656
line = [550, 260, 620, 260]
BLUE_RECT_X_START = 10
BLUE_RECT_X_END = 510
BLUE_RECT_FIXED_HEIGHT = 100
TRIGGER_LINE_Y = line[1]
TRIGGER_TOLERANCE = 10
record_queue = Queue(maxsize=int(FPS * 2))
process_queue = Queue(maxsize=1)
display_queue = Queue(maxsize=1)
analysis_lock = threading.Lock()
video_writer = None
clients = []
recordIndex = 0
exit_flag = False
recording_flag = False
processing_active = False
wafer_count = 0
tracked_counter = 0
counted_ids = set()
save_counter_green = 1
previous_positions = {}
tracker = None
yolo_pin_model = None
model_loaded_event = threading.Event()
def write_log(message):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def clear_directory(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
write_log(f"清空目錄: {dir_path}...")
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
write_log(f'無法刪除 {file_path}。原因: {e}')
def setup_directories():
for dir_path in [SAVE_DIR_GREEN, RECORDS_DIR, SAVE_DIR_TRACKED]:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
write_log(f"已建立目錄: {dir_path}")
def reset_analysis_state():
global wafer_count, counted_ids, save_counter_green, previous_positions, tracked_counter, tracker
with analysis_lock:
wafer_count = 0
tracked_counter = 0
counted_ids = set()
save_counter_green = 1
previous_positions = {}
tracker = MBTS(max_age=90, min_hits=3, iou_threshold=0.1)
clear_directory(SAVE_DIR_GREEN)
clear_directory(SAVE_DIR_TRACKED)
#clear_directory(RECORDS_DIR) # 通常不清空 Records,除非有特別需求
textfor q in [record_queue, process_queue, display_queue]: while not q.empty(): try: q.get_nowait() except Empty: break write_log("分析狀態、追蹤器與所有佇列已重置。")
def open_camera():
backend = cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY
cap = cv2.VideoCapture(CAMERA_INDEX, backend)
if not cap.isOpened():
write_log(f"錯誤: 無法開啟攝影機索引 {CAMERA_INDEX}")
return None
cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
cap.set(cv2.CAP_PROP_FPS, FPS)
cap.set(cv2.CAP_PROP_EXPOSURE, EXPOSURE)
cap.set(cv2.CAP_PROP_BRIGHTNESS, BRIGHTNESS)
cap.set(cv2.CAP_PROP_CONTRAST, CONTRAST)
write_log("攝影機參數設定完成")
time.sleep(1)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
write_log(f"成功開啟攝影機。實際解析度: {w}x{h}")
return cap
def camera_capture_thread(cap):
global exit_flag
while not exit_flag:
ret, frame = cap.read()
if not ret:
write_log("無法從相機讀取影像,將停止讀取。")
break
try:
process_queue.get_nowait() # 清掉舊的
except Empty:
pass
try:
process_queue.put_nowait(frame)
except Full:
pass # 忽略,等待下一幀
textif recording_flag: try: record_queue.put_nowait(frame.copy()) except Full: write_log("警告: 錄影佇列已滿,可能發生掉幀!") cap.release() write_log("相機讀取執行緒已停止。")
def create_video_writer(width, height, cap):
global recordIndex
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
recordIndex += 1
filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4")
real_fps = cap.get(cv2.CAP_PROP_FPS)
if real_fps <= 0: # 有些相機會回傳 0
real_fps = 30.0
writer = cv2.VideoWriter(filename, fourcc, real_fps, (width, height))
write_log(f"開始錄影,存檔至: {filename} (FPS={real_fps})")
return writer
def record_thread(cap):
global video_writer, exit_flag
while not exit_flag:
if recording_flag:
try:
frame_to_record = record_queue.get(timeout=1)
if frame_to_record is None: # Termination signal
break
if video_writer is None:
h, w, _ = frame_to_record.shape
# <<< FIX 2: 呼叫 create_video_writer 時傳入 cap
video_writer = create_video_writer(w, h, cap)
video_writer.write(frame_to_record)
except Empty:
continue
else:
if video_writer is not None:
video_writer.release()
video_writer = None
write_log("錄影已停止並存檔。")
time.sleep(0.1)
def handle_client(client_socket, addr):
global recording_flag, processing_active
try:
while not exit_flag:
data = client_socket.recv(1024)
if not data:
break
msg = data.decode().strip().lower()
if msg == "start":
if not model_loaded_event.is_set():
client_socket.sendall("Model is still loading, please wait...\n".encode())
continue
if not processing_active:
reset_analysis_state()
processing_active = True
recording_flag = True
client_socket.sendall("Analysis and recording started.\n".encode())
write_log(f"收到來自 {addr} 的 'start' 命令。")
else:
client_socket.sendall("Already in processing state.\n".encode())
elif msg == "stop":
if processing_active:
write_log(f"收到來自 {addr} 的 'stop' 命令,停止處理...")
processing_active = False
recording_flag = False
client_socket.sendall("Processing stopped. Generating final report...\n".encode())
write_log("開始執行最終分析...")
all_predictions = wafer_seg_640x480_video.analyze_images(model_path=YOLO_SEG_MODEL_PATH, image_folder=SAVE_DIR_GREEN)
write_log(f"分析完成,結果: {all_predictions}")
final_response = f"Result: {','.join(map(str, all_predictions))}\n" if all_predictions is not None else "Result: Error: Analysis failed to produce results.\n"
client_socket.sendall(final_response.encode('utf-8'))
write_log(f"已將分析結果傳送至 {addr}。系統返回待命狀態。")
else:
client_socket.sendall("Not in processing state.\n".encode())
else:
client_socket.sendall(f"Unknown command: {msg}\n".encode())
except (ConnectionResetError, BrokenPipeError):
write_log(f"客戶端 {addr} 強制中斷了連線。")
except Exception as e:
write_log(f"與客戶端 {addr} 通訊時發生錯誤: {e}")
finally:
client_socket.close()
clients.remove(client_socket)
write_log(f"客戶端 {addr} 已斷線。")
def tcp_server_thread():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("0.0.0.0", TCP_PORT))
server_socket.listen(5)
write_log(f"TCP 伺服器已啟動,監聽埠: {TCP_PORT}")
while not exit_flag:
try:
server_socket.settimeout(1.0)
client_socket, addr = server_socket.accept()
clients.append(client_socket)
write_log(f"客戶端 {addr} 已連線。")
thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True)
thread.start()
except socket.timeout:
continue
server_socket.close()
write_log("TCP 伺服器已關閉。")
def load_model_thread():
global yolo_pin_model
try:
write_log(f"開始載入 YOLO 模型: {YOLO_PIN_MODEL_PATH}...")
yolo_pin_model = YOLO(YOLO_PIN_MODEL_PATH)
write_log("YOLO 模型載入成功!")
model_loaded_event.set()
except Exception as e:
write_log(f"載入 YOLO 模型失敗: {e}")
def inference_thread():
global exit_flag, wafer_count, tracked_counter, save_counter_green, previous_positions, counted_ids
while not exit_flag:
try:
frame_orig = process_queue.get(timeout=1)
if frame_orig is None:
break
frame = frame_orig.copy()
if processing_active:
tracks = np.empty((0, 5))
trigger_object_cy = None
height, , _ = frame.shape
with analysis_lock:
results = yolo_pin_model(frame, stream=True, verbose=False, conf=0.25)
detections = [[int(x) for x in box.xyxy[0]] + [float(box.conf[0])] for info in results for box in info.boxes]
if tracker is not None:
tracks = tracker.update(np.array(detections) if detections else np.empty((0, 5)))
current_positions_in_frame = {}
if tracks.size > 0:
for track in tracks:
x1, y1, x2, y2, track_id = map(int, track)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
current_positions_in_frame[track_id] = cy
if track_id in previous_positions:
prev_cy = previous_positions[track_id]
if prev_cy > (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and cy <= (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and track_id not in counted_ids:
counted_ids.add(track_id)
wafer_count += 1
write_log(f"物件 ID {track_id} 跨越線計數,總數: {wafer_count}")
trigger_object_cy = cy
if trigger_object_cy is not None:
rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_x1, rect_x2 = BLUE_RECT_X_START, BLUE_RECT_X_END
crop = frame_orig[rect_y1:rect_y2, rect_x1:rect_x2]
if crop.size > 0:
save_path = os.path.join(SAVE_DIR_GREEN, f"crop{save_counter_green:04d}.jpg")
cv2.imwrite(save_path, crop)
save_counter_green += 1
previous_positions = current_positions_in_frame
# --- 繪圖邏輯 ---
cv2.line(frame, (line[0], line[1]), (line[2], line[1]), (0, 255, 0), thickness=2)
cv2.line(frame, (line[0], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (line[2], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (0, 100, 0), thickness=1, lineType=cv2.LINE_AA)
if tracks.size > 0:
for track in tracks:
x1, y1, x2, y2, track_id = map(int, track)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
cvzone.putTextRect(frame, str(track_id), [x1, cy+5], scale=0.8, thickness=1, colorR=(0, 255, 255), colorT=(255, 0, 0), offset=5)
if trigger_object_cy is not None:
rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10)
cv2.rectangle(frame, (BLUE_RECT_X_START, rect_y1), (BLUE_RECT_X_END, rect_y2), (255, 0, 0), 3)
cvzone.putTextRect(frame, f'Count: {wafer_count}', (320, 50), scale=1, thickness=2, colorT=(0, 0, 0), colorR=(255, 127, 0))
cvzone.putTextRect(frame, "STATUS: PROCESSING", (10, 30), scale=1, thickness=2, colorR=(0,200,0))
if tracks.size > 0:
with analysis_lock:
try:
save_path = os.path.join(SAVE_DIR_TRACKED, f"tracked_{tracked_counter:04d}.jpg")
cv2.imwrite(save_path, frame)
tracked_counter += 1
except Exception as e:
write_log(f"儲存追蹤畫面至 {SAVE_DIR_TRACKED} 失敗: {e}")
else:
if not model_loaded_event.is_set():
cvzone.putTextRect(frame, "STATUS: LOADING MODEL...", (10, 30), scale=1, thickness=2, colorR=(0,165,255))
else:
cvzone.putTextRect(frame, "STATUS: WAITING FOR START COMMAND", (10, 30), scale=1, thickness=2, colorR=(0,165,255))
try:
display_queue.get_nowait()
except Empty:
pass
try:
display_queue.put_nowait(frame)
except Full:
pass
except Empty:
time.sleep(0.01)
except Exception as e:
write_log(f"!!! 推論執行緒發生嚴重錯誤: {e}")
traceback.print_exc()
time.sleep(1)
def main():
global exit_flag
setup_directories()
reset_analysis_state()
textcap = open_camera() if cap is None: exit_flag = True sys.exit() # <<< FIX 3: 在 cap 物件建立後,再建立需要它的執行緒 all_threads = [ threading.Thread(target=load_model_thread, daemon=True), threading.Thread(target=tcp_server_thread, daemon=True), threading.Thread(target=record_thread, args=(cap,), daemon=True), # 將 cap 傳入 threading.Thread(target=camera_capture_thread, args=(cap,), daemon=True, name="camera_capture_thread"), threading.Thread(target=inference_thread, daemon=True, name="inference_thread") ] for t in all_threads: t.start() prev_time = 0 write_log("程式已啟動。等待模型載入完成和 'start' 命令...") while not exit_flag: try: display_frame = display_queue.get(timeout=1) if display_frame is None: break curr_time = time.time() if prev_time > 0: fps = 1 / (curr_time - prev_time) cv2.putText(display_frame, f"Display FPS: {int(fps)}", (CAMERA_WIDTH - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) prev_time = curr_time cv2.imshow('Wafer Detection Service', display_frame) except Empty: inference_is_alive = any(t.name == 'inference_thread' and t.is_alive() for t in all_threads) if not inference_is_alive: write_log("錯誤:推論執行緒已意外終止!請檢查日誌。程式即將退出。") exit_flag = True else: write_log("警告: 顯示佇列長時間無影像,但推論執行緒仍在運行。") continue key = cv2.waitKey(1) & 0xFF if key == ord('q') or cv2.getWindowProperty('Wafer Detection Service', cv2.WND_PROP_VISIBLE) < 1: write_log("手動退出程式...") exit_flag = True exit_flag = True for q in [record_queue, process_queue, display_queue]: try: q.put_nowait(None) except Full: pass for t in all_threads: t.join(timeout=2.0) cv2.destroyAllWindows() write_log("主程式迴圈結束,資源已釋放。")
if name == "main":
main()
write_log("主程式執行完畢。")
上面的程式為什麼實際錄影的時間有60 秒,但是看 mp4撥放的時間只有 30 秒 ?
短答案:因為你寫進 MP4 的「名義 FPS」跟實際寫出的畫格速率不一致,再加上你在錄影佇列滿載時會「悄悄丟幀」。播放器只看檔案標的 FPS 來算時長,結果就把 60 秒的實際拍攝,播成 30 秒。
VideoWriter
的 FPS 可能設太高pythonreal_fps = cap.get(cv2.CAP_PROP_FPS) writer = cv2.VideoWriter(filename, fourcc, real_fps, (width, height))
實務上很多相機/後端(尤其是 Windows)會在你 cap.set(FPS=60)
後「回報 60」,但實際只給得到 ~30fps。
→ 結果:你 60 秒內只寫到 ~1800 張幀,但檔頭寫成 60 fps,播放器就算出時長 1800/60 = 30
秒。
pythonif recording_flag: try: record_queue.put_nowait(frame.copy()) except Full: write_log("警告: 錄影佇列已滿,可能發生掉幀!")
這裡不阻塞、直接丟棄幀。幀數再度下降 → 實際幀數 < 檔頭 FPS → 播放時間更短。
以上兩點同時存在時,就很容易出現「錄 60 秒,檔案只有 30 秒」的現象。
VideoWriter
在開始寫檔前,先用時間戳測 1–2 秒的平均幀率,用測到的值當作 VideoWriter
的 fps(不要信 cap.get
):
pythondef measure_capture_fps(cap, sample_seconds=2.0): import time frames = 0 start = time.perf_counter() while time.perf_counter() - start < sample_seconds: ret, _ = cap.read() if not ret: break frames += 1 elapsed = time.perf_counter() - start return frames / elapsed if elapsed > 0 else 30.0
把 create_video_writer
改成用上面的量測(或把量測結果存在外部變數傳入):
pythondef create_video_writer(width, height, fps_for_file): fourcc = cv2.VideoWriter_fourcc(*'mp4v') now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4") writer = cv2.VideoWriter(filename, fourcc, fps_for_file, (width, height)) write_log(f"開始錄影,存檔至: {filename} (FPS={fps_for_file})") return writer
並在主程式打開相機後、開啟執行緒前量測一次,存成 measured_fps
,再傳進 record_thread
用來建立 writer。
把 record_queue.put_nowait(...)
換成會等的 put(...)
,或給個合理 timeout,避免靜默丟幀:
python# camera_capture_thread 內 if recording_flag: try: record_queue.put(frame.copy(), timeout=0.5) # 允許稍等,避免丟幀 except Full: write_log("警告: 錄影佇列擁塞(已避丟幀盡力)")
另外把 record_queue
大一點點(例如 FPS * 4
)也有幫助,但真正的關鍵是不要用 non-blocking 的 put_nowait。
record_thread
裡每寫一幀就記錄時間,若寫不及,可在 camera_capture_thread
降低擷取率(cv2.CAP_PROP_FPS
不一定可靠;可改成每隔固定時間取一幀),確保供給 ≈ 寫出能力。VideoWriter
只吃「固定 FPS」。若要完全準確的時間軸,改用 ffmpeg 串流、輸入每幀 PTS,或先存原始幀+timestamps,錄完用 ffmpeg 以 timestamps 重建影片。cap.get(CAP_PROP_FPS)
:把它當參考即可,實際用量測結果。python# main() 打開相機後: measured_fps = measure_capture_fps(cap, sample_seconds=1.5) write_log(f"實測擷取 FPS ≈ {measured_fps:.2f}") # record_thread 裡,建立 writer 時用 measured_fps def record_thread(cap, measured_fps): global video_writer, exit_flag while not exit_flag: if recording_flag: try: frame_to_record = record_queue.get(timeout=1) if frame_to_record is None: break if video_writer is None: h, w, _ = frame_to_record.shape video_writer = create_video_writer(w, h, measured_fps) video_writer.write(frame_to_record) except Empty: continue else: if video_writer is not None: video_writer.release() video_writer = None write_log("錄影已停止並存檔。") time.sleep(0.1)
註:你現在已把
record_thread
正確地接到cap
;再多傳一個measured_fps
進去即可。
如果你要,我也可以幫你把上述改動直接套進你的檔案,或幫你加上即時 FPS/丟幀率的 log,方便你驗證修正後的效果。
請給出完整的程式
沒問題!以下是「完整」的修正版程式 —— 已修正造成影片長度減半的關鍵點:
cap.get(CAP_PROP_FPS)
)建立 VideoWriter
,避免「名義 FPS ≠ 實際寫出幀率」。FPS * 4
)。直接把下列程式整段覆蓋你的檔即可:
python# 檔名: wafer_wafer_640x480_mp4_queue_arch_fixed_v3.py import cv2 import numpy as np import cvzone import time import os import sys import shutil import platform import datetime import threading import socket from queue import Queue, Empty, Full import traceback # Import for detailed error logging from ultralytics import YOLO from MBTS import MBTS import wafer_seg_640x480_video # ==================== 程式設定 ==================== YOLO_PIN_MODEL_PATH = 'best_slotIR_640x480.pt' YOLO_SEG_MODEL_PATH = 'best_result_640x480.pt' CLASSES_FILE = 'classes1.txt' SAVE_DIR_TRACKED = "RED_circuit" SAVE_DIR_GREEN = "Green_square" RECORDS_DIR = "Records" CAMERA_INDEX = 0 CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 FPS = 60.0 EXPOSURE = -7 BRIGHTNESS = 40.0 CONTRAST = 32.0 TCP_PORT = 5656 line = [550, 260, 620, 260] BLUE_RECT_X_START = 10 BLUE_RECT_X_END = 510 BLUE_RECT_FIXED_HEIGHT = 100 # ==================== 優化參數 ==================== TRIGGER_LINE_Y = line[1] TRIGGER_TOLERANCE = 10 # ==================== 佇列設定 ==================== # 放大錄影佇列避免瞬間擁塞;顯示/處理仍維持單幀覆蓋語意 record_queue = Queue(maxsize=int(FPS * 4)) process_queue = Queue(maxsize=1) display_queue = Queue(maxsize=1) # ======================== 全域變數 ======================== analysis_lock = threading.Lock() video_writer = None clients = [] recordIndex = 0 exit_flag = False recording_flag = False processing_active = False # 這些共享狀態需要保護 wafer_count = 0 tracked_counter = 0 counted_ids = set() save_counter_green = 1 previous_positions = {} tracker = None # --- yolo_pin_model = None model_loaded_event = threading.Event() # ==================== 輔助函式 ==================== def write_log(message): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def clear_directory(dir_path): if not os.path.exists(dir_path): os.makedirs(dir_path) return write_log(f"清空目錄: {dir_path}...") for filename in os.listdir(dir_path): file_path = os.path.join(dir_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: write_log(f'無法刪除 {file_path}。原因: {e}') def setup_directories(): for dir_path in [SAVE_DIR_GREEN, RECORDS_DIR, SAVE_DIR_TRACKED]: if not os.path.exists(dir_path): os.makedirs(dir_path) write_log(f"已建立目錄: {dir_path}") def measure_capture_fps(cap, sample_seconds=1.5): """ 實測相機擷取幀率,避免使用 cap.get(CAP_PROP_FPS) 造成名義 FPS 與實際不符。 在啟動任何讀取執行緒之前呼叫。 """ frames = 0 start = time.perf_counter() while time.perf_counter() - start < sample_seconds: ret, _ = cap.read() if not ret: break frames += 1 elapsed = time.perf_counter() - start measured = frames / elapsed if elapsed > 0 else 30.0 return measured # ==================== 核心函式 ==================== def reset_analysis_state(): global wafer_count, counted_ids, save_counter_green, previous_positions, tracked_counter, tracker with analysis_lock: wafer_count = 0 tracked_counter = 0 counted_ids = set() save_counter_green = 1 previous_positions = {} tracker = MBTS(max_age=90, min_hits=3, iou_threshold=0.1) clear_directory(SAVE_DIR_GREEN) clear_directory(SAVE_DIR_TRACKED) #clear_directory(RECORDS_DIR) # 通常不清空 Records,除非有特別需求 for q in [record_queue, process_queue, display_queue]: while not q.empty(): try: q.get_nowait() except Empty: break write_log("分析狀態、追蹤器與所有佇列已重置。") # ==================== 相機與錄影 ==================== def open_camera(): backend = cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY cap = cv2.VideoCapture(CAMERA_INDEX, backend) if not cap.isOpened(): write_log(f"錯誤: 無法開啟攝影機索引 {CAMERA_INDEX}") return None cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT) cap.set(cv2.CAP_PROP_FPS, FPS) cap.set(cv2.CAP_PROP_EXPOSURE, EXPOSURE) cap.set(cv2.CAP_PROP_BRIGHTNESS, BRIGHTNESS) cap.set(cv2.CAP_PROP_CONTRAST, CONTRAST) write_log("攝影機參數設定完成") time.sleep(1) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) write_log(f"成功開啟攝影機。實際解析度: {w}x{h}") return cap def camera_capture_thread(cap): global exit_flag while not exit_flag: ret, frame = cap.read() if not ret: write_log("無法從相機讀取影像,將停止讀取。") break # 供推論的最新幀 — 覆蓋舊值 try: process_queue.get_nowait() except Empty: pass try: process_queue.put_nowait(frame) except Full: pass # 忽略,等待下一幀 # 供錄影:避免靜默丟幀,改為阻塞 put,並給 timeout if recording_flag: try: record_queue.put(frame.copy(), timeout=0.5) except Full: write_log("警告: 錄影佇列擁塞(已盡力避免丟幀)") cap.release() write_log("相機讀取執行緒已停止。") def create_video_writer(width, height, fps_for_file): global recordIndex fourcc = cv2.VideoWriter_fourcc(*'mp4v') now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") recordIndex += 1 filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4") writer = cv2.VideoWriter(filename, fourcc, fps_for_file, (width, height)) write_log(f"開始錄影,存檔至: {filename} (FPS_for_file={fps_for_file:.2f})") return writer def record_thread(fps_for_file): """ 錄影執行緒:以「實測 FPS」建立 VideoWriter,避免影片長度被壓縮。 """ global video_writer, exit_flag while not exit_flag: if recording_flag: try: frame_to_record = record_queue.get(timeout=1) if frame_to_record is None: # Termination signal break if video_writer is None: h, w, _ = frame_to_record.shape video_writer = create_video_writer(w, h, fps_for_file) video_writer.write(frame_to_record) except Empty: continue else: if video_writer is not None: video_writer.release() video_writer = None write_log("錄影已停止並存檔。") time.sleep(0.1) # ==================== TCP 通訊 & 模型載入 ==================== def handle_client(client_socket, addr): global recording_flag, processing_active try: while not exit_flag: data = client_socket.recv(1024) if not data: break msg = data.decode().strip().lower() if msg == "start": if not model_loaded_event.is_set(): client_socket.sendall("Model is still loading, please wait...\n".encode()) continue if not processing_active: reset_analysis_state() processing_active = True recording_flag = True client_socket.sendall("Analysis and recording started.\n".encode()) write_log(f"收到來自 {addr} 的 'start' 命令。") else: client_socket.sendall("Already in processing state.\n".encode()) elif msg == "stop": if processing_active: write_log(f"收到來自 {addr} 的 'stop' 命令,停止處理...") processing_active = False recording_flag = False client_socket.sendall("Processing stopped. Generating final report...\n".encode()) write_log("開始執行最終分析...") all_predictions = wafer_seg_640x480_video.analyze_images(model_path=YOLO_SEG_MODEL_PATH, image_folder=SAVE_DIR_GREEN) write_log(f"分析完成,結果: {all_predictions}") final_response = f"Result: {','.join(map(str, all_predictions))}\n" if all_predictions is not None else "Result: Error: Analysis failed to produce results.\n" client_socket.sendall(final_response.encode('utf-8')) write_log(f"已將分析結果傳送至 {addr}。系統返回待命狀態。") else: client_socket.sendall("Not in processing state.\n".encode()) else: client_socket.sendall(f"Unknown command: {msg}\n".encode()) except (ConnectionResetError, BrokenPipeError): write_log(f"客戶端 {addr} 強制中斷了連線。") except Exception as e: write_log(f"與客戶端 {addr} 通訊時發生錯誤: {e}") finally: client_socket.close() if client_socket in clients: clients.remove(client_socket) write_log(f"客戶端 {addr} 已斷線。") def tcp_server_thread(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(("0.0.0.0", TCP_PORT)) server_socket.listen(5) write_log(f"TCP 伺服器已啟動,監聽埠: {TCP_PORT}") while not exit_flag: try: server_socket.settimeout(1.0) client_socket, addr = server_socket.accept() clients.append(client_socket) write_log(f"客戶端 {addr} 已連線。") thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True) thread.start() except socket.timeout: continue server_socket.close() write_log("TCP 伺服器已關閉。") def load_model_thread(): global yolo_pin_model try: write_log(f"開始載入 YOLO 模型: {YOLO_PIN_MODEL_PATH}...") yolo_pin_model = YOLO(YOLO_PIN_MODEL_PATH) write_log("YOLO 模型載入成功!") model_loaded_event.set() except Exception as e: write_log(f"載入 YOLO 模型失敗: {e}") # ==================== 推論執行緒 ==================== def inference_thread(): global exit_flag, wafer_count, tracked_counter, save_counter_green, previous_positions, counted_ids while not exit_flag: try: frame_orig = process_queue.get(timeout=1) if frame_orig is None: break frame = frame_orig.copy() if processing_active: tracks = np.empty((0, 5)) trigger_object_cy = None height, _, _ = frame.shape with analysis_lock: results = yolo_pin_model(frame, stream=True, verbose=False, conf=0.25) detections = [[int(x) for x in box.xyxy[0]] + [float(box.conf[0])] for info in results for box in info.boxes] if tracker is not None: tracks = tracker.update(np.array(detections) if detections else np.empty((0, 5))) current_positions_in_frame = {} if tracks.size > 0: for track in tracks: x1, y1, x2, y2, track_id = map(int, track) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 current_positions_in_frame[track_id] = cy if track_id in previous_positions: prev_cy = previous_positions[track_id] if prev_cy > (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and cy <= (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and track_id not in counted_ids: counted_ids.add(track_id) wafer_count += 1 write_log(f"物件 ID {track_id} 跨越線計數,總數: {wafer_count}") trigger_object_cy = cy if trigger_object_cy is not None: rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_x1, rect_x2 = BLUE_RECT_X_START, BLUE_RECT_X_END crop = frame_orig[rect_y1:rect_y2, rect_x1:rect_x2] if crop.size > 0: save_path = os.path.join(SAVE_DIR_GREEN, f"crop_{save_counter_green:04d}.jpg") cv2.imwrite(save_path, crop) save_counter_green += 1 previous_positions = current_positions_in_frame # --- 繪圖邏輯 --- cv2.line(frame, (line[0], line[1]), (line[2], line[1]), (0, 255, 0), thickness=2) cv2.line(frame, (line[0], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (line[2], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (0, 100, 0), thickness=1, lineType=cv2.LINE_AA) if tracks.size > 0: for track in tracks: x1, y1, x2, y2, track_id = map(int, track) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2) cvzone.putTextRect(frame, str(track_id), [x1, cy+5], scale=0.8, thickness=1, colorR=(0, 255, 255), colorT=(255, 0, 0), offset=5) if trigger_object_cy is not None: rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10) cv2.rectangle(frame, (BLUE_RECT_X_START, rect_y1), (BLUE_RECT_X_END, rect_y2), (255, 0, 0), 3) cvzone.putTextRect(frame, f'Count: {wafer_count}', (320, 50), scale=1, thickness=2, colorT=(0, 0, 0), colorR=(255, 127, 0)) cvzone.putTextRect(frame, "STATUS: PROCESSING", (10, 30), scale=1, thickness=2, colorR=(0,200,0)) if tracks.size > 0: with analysis_lock: try: save_path = os.path.join(SAVE_DIR_TRACKED, f"tracked_{tracked_counter:04d}.jpg") cv2.imwrite(save_path, frame) tracked_counter += 1 except Exception as e: write_log(f"儲存追蹤畫面至 {SAVE_DIR_TRACKED} 失敗: {e}") else: if not model_loaded_event.is_set(): cvzone.putTextRect(frame, "STATUS: LOADING MODEL...", (10, 30), scale=1, thickness=2, colorR=(0,165,255)) else: cvzone.putTextRect(frame, "STATUS: WAITING FOR START COMMAND", (10, 30), scale=1, thickness=2, colorR=(0,165,255)) try: display_queue.get_nowait() except Empty: pass try: display_queue.put_nowait(frame) except Full: pass except Empty: time.sleep(0.01) except Exception as e: write_log(f"!!! 推論執行緒發生嚴重錯誤: {e}") traceback.print_exc() time.sleep(1) # ==================== 主程式 ==================== def main(): global exit_flag setup_directories() reset_analysis_state() cap = open_camera() if cap is None: exit_flag = True sys.exit() # 重要:啟動任何讀取執行緒前,先「實測」擷取 FPS,並用該數值建立 VideoWriter measured_fps = measure_capture_fps(cap, sample_seconds=1.5) write_log(f"實測擷取 FPS ≈ {measured_fps:.2f}") # 建立執行緒 all_threads = [ threading.Thread(target=load_model_thread, daemon=True), threading.Thread(target=tcp_server_thread, daemon=True), threading.Thread(target=record_thread, args=(measured_fps,), daemon=True), # 以實測 FPS 寫檔 threading.Thread(target=camera_capture_thread, args=(cap,), daemon=True, name="camera_capture_thread"), threading.Thread(target=inference_thread, daemon=True, name="inference_thread") ] for t in all_threads: t.start() prev_time = 0 write_log("程式已啟動。等待模型載入完成和 'start' 命令...") while not exit_flag: try: display_frame = display_queue.get(timeout=1) if display_frame is None: break curr_time = time.time() if prev_time > 0: fps = 1 / (curr_time - prev_time) cv2.putText(display_frame, f"Display FPS: {int(fps)}", (CAMERA_WIDTH - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) prev_time = curr_time cv2.imshow('Wafer Detection Service', display_frame) except Empty: inference_is_alive = any(t.name == 'inference_thread' and t.is_alive() for t in all_threads) if not inference_is_alive: write_log("錯誤:推論執行緒已意外終止!請檢查日誌。程式即將退出。") exit_flag = True else: write_log("警告: 顯示佇列長時間無影像,但推論執行緒仍在運行。") continue key = cv2.waitKey(1) & 0xFF if key == ord('q') or cv2.getWindowProperty('Wafer Detection Service', cv2.WND_PROP_VISIBLE) < 1: write_log("手動退出程式...") exit_flag = True exit_flag = True for q in [record_queue, process_queue, display_queue]: try: q.put_nowait(None) except Full: pass for t in all_threads: t.join(timeout=2.0) cv2.destroyAllWindows() write_log("主程式迴圈結束,資源已釋放。") if __name__ == "__main__": main() write_log("主程式執行完畢。")
measure_capture_fps()
,在 main()
開始前 1.5 秒量測實際擷取幀率,並把該值傳給 record_thread
。create_video_writer()
使用 fps_for_file
(實測值)建立 MP4,確保檔頭 FPS 與實際寫入幀率一致。camera_capture_thread()
對錄影佇列改為 record_queue.put(..., timeout=0.5)
,避免靜默丟幀。record_queue
容量為 FPS * 4
,減少短暫尖峰造成的擁塞。如果你也想同時記錄「實際寫入幀數」與「預期幀數」來估算掉幀率,我可以再幫你加上簡單的統計 log。