Agora o arquivo legenda.psv, está da seguinte form...

Criado em: 24 de abril de 2025

Respondido usando O3 por Chat01

Pergunta

Agora o arquivo legenda.psv, está da seguinte forma:

É|12.477|12.742
|12.742|12.783
h|12.783|12.804
o|12.804|13.130
j|13.130|13.151
e|13.151|13.192
|13.192|13.273
q|13.273|13.294
u|13.294|13.314
e|13.314|13.335
|13.335|13.396
a|13.396|13.416
|13.416|13.518
t|13.518|13.539
e|13.539|13.559
r|13.559|13.682
r|13.682|13.702
a|13.702|13.723
|13.723|13.845
t|13.845|13.866
r|13.866|13.886
e|13.886|14.376
m|14.376|14.397
e.|14.397|14.437
|14.437|14.977
É|14.977|15.773
|15.773|15.794
h|15.794|15.814
o|15.814|15.835
j|15.835|15.855
e|15.855|15.876
...

Preciso ajustar a exibição do texto no vídeo. Atualmente, ele mostra 4 linhas de texto por vez, que podem ser frases incompletas. Quero que ele passe a mostrar 4 frases completas. Para identificar uma frase, considere o texto que começa com letra maiúscula (É) e termina com pontuação final (e.). A funcionalidade de sincronização (por sílaba, palavra, etc.) deve continuar exatamente a mesma, a única mudança é agrupar e exibir o texto em unidades de 4 frases, em vez de 4 linhas.

  • em relação ao aviso de não encontrar a silaba por causa do . final depois da letra - ignore isso quando tiver o .
python
import cupy as cp import numpy as np from PIL import Image, ImageDraw, ImageFont import time import threading import math import os import queue import subprocess from tqdm import tqdm import psutil import traceback from cupyx.scipy import ndimage as cupy_ndimage # DEFAULT_CONFIG = { "font_path": "C:/Users/lucas/LilitaOne-Regular.ttf", "font_size": 140, "video_resolution": "1920x1080", "video_fps": 6, "base_text_color": "#FFFFFF", "highlight_text_color": "#ff0000", "num_visible_lines": 4, "upper_case": True, "background_image": "capa.png", "frames_per_batch": 16, "default_subtitle_file": "legenda.psv", "default_output_file": "video_karaoke_char_level.mp4", "ffmpeg_preset": "p4", "ffmpeg_tune": "hq", "ffmpeg_bitrate": "20M", "ffmpeg_codec": "h264_nvenc", "vertical_shift_pixels": 130, "min_char_duration": 0.01, } def hex_to_bgr_cupy(hex_color): hex_color = hex_color.lstrip('#') rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) return cp.array(rgb[::-1], dtype=cp.uint8) class TextRenderer: def __init__(self, config): self.config = config self.font_path = config["font_path"] self.font_size = config["font_size"] self.num_visible_lines = config["num_visible_lines"] self.upper_case = config["upper_case"] self.base_text_color = config["base_text_color"] try: self.font = ImageFont.truetype(self.font_path, self.font_size) temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) space_bbox = temp_draw.textbbox((0, 0), " ", font=self.font) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = space_bbox[2] - space_bbox[0] if space_bbox else int(self.font_size * 0.25) try: sample_bbox = self.font.getbbox("Tg") self.line_height_ref = sample_bbox[3] - sample_bbox[1] except AttributeError: sample_bbox_fallback = temp_draw.textbbox((0, 0), "Tg", font=self.font) self.line_height_ref = sample_bbox_fallback[3] - sample_bbox_fallback[1] if sample_bbox_fallback else int(self.font_size * 1.2) del temp_draw, temp_img except Exception as e: print(f"Aviso: Falha ao carregar a fonte '{self.font_path}'. Usando fonte padrão. Erro: {e}") self.font = ImageFont.load_default() try: bbox = self.font.getbbox("M") self.font_size = bbox[3] - bbox[1] except AttributeError: self.font_size = 20 temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = 10 try: bbox = self.font.getbbox("Tg") self.line_height_ref = bbox[3] - bbox[1] except AttributeError: self.line_height_ref = 30 del temp_draw, temp_img if self.num_visible_lines <= 1: spacing_multiplier = 1.0 elif self.num_visible_lines == 2: spacing_multiplier = 0.8 elif self.num_visible_lines == 3: spacing_multiplier = 0.6 else: spacing_multiplier = 0.4 self.line_spacing = int(self.line_height_ref * spacing_multiplier) self.line_spacing = max(0, self.line_spacing) def _get_element_width(self, draw, text, font): """Obtém a largura de um elemento de texto (sílaba ou espaço).""" if text == " ": return self.space_width_ref try: return draw.textlength(text, font=font) except AttributeError: try: bbox = draw.textbbox((0, 0), text, font=font) return bbox[2] - bbox[0] if bbox else 0 except AttributeError: width, _ = draw.textsize(text, font=font) return width except Exception: return len(text) * (self.font_size // 2) def render_text_images(self, displayed_content, active_line_local_idx, width, height): """Renderiza linhas de sílabas/espaços, incluindo o índice global.""" img_base = Image.new("RGB", (width, height), (0, 0, 0)) img_mask = Image.new("L", (width, height), 0) draw_base = ImageDraw.Draw(img_base) draw_mask = ImageDraw.Draw(img_mask) font = self.font line_height_render = self.line_height_ref line_render_data = [] for global_idx, line_elements in displayed_content: if line_elements and global_idx is not None: line_visual_width = 0 for _, _, _, precalculated_width in line_elements: line_visual_width += precalculated_width line_render_data.append({ "height": line_height_render, "width": line_visual_width, "elements": line_elements, "global_idx": global_idx }) else: line_render_data.append(None) current_block_actual_height = sum(info["height"] for info in line_render_data if info) visible_lines_count = sum(1 for info in line_render_data if info) if visible_lines_count > 1: current_block_actual_height += (visible_lines_count - 1) * self.line_spacing all_syllable_render_info = [] active_syllable_indices = (-1, -1) current_global_syl_idx = 0 centered_y = max(0, (height - current_block_actual_height) // 2) vertical_shift = self.config.get("vertical_shift_pixels", 0) current_block_start_y = max(0, centered_y + vertical_shift) current_y = current_block_start_y lines_drawn_count = 0 for local_idx, render_info in enumerate(line_render_data): if render_info is None: continue line_height = render_info["height"] line_width_total = render_info["width"] elements_in_line = render_info["elements"] current_global_line_idx = render_info["global_idx"] if lines_drawn_count > 0: current_y += self.line_spacing is_active_line = (local_idx == active_line_local_idx) if is_active_line: active_syllable_start_idx_global = current_global_syl_idx line_start_x = (width - line_width_total) // 2 current_x = float(line_start_x) line_y_draw = current_y for i, (start_time, end_time, element_text, element_width) in enumerate(elements_in_line): if not element_text.isspace(): try: bbox = font.getbbox(element_text) draw_x = int(current_x) draw_y = line_y_draw draw_base.text((draw_x, draw_y), element_text, font=font, fill=self.base_text_color) draw_mask.text((draw_x, draw_y), element_text, font=font, fill=255) final_bbox = draw_base.textbbox((draw_x, draw_y), element_text, font=font) if final_bbox: bbox_left, bbox_top, bbox_right, bbox_bottom = final_bbox syl_w_actual = bbox_right - bbox_left syl_h_actual = bbox_bottom - bbox_top else: bbox_left = draw_x bbox_top = draw_y syl_w_actual = element_width syl_h_actual = line_height except Exception as e: print(f"Fallback de renderização para: {element_text}. Erro: {e}") draw_x = int(current_x) draw_y = line_y_draw draw_base.text((draw_x, draw_y), element_text, font=font, fill=self.base_text_color) draw_mask.text((draw_x, draw_y), element_text, font=font, fill=255) bbox_left = draw_x bbox_top = draw_y syl_w_actual = element_width syl_h_actual = line_height all_syllable_render_info.append((start_time, end_time, bbox_left, bbox_top, syl_w_actual, syl_h_actual, current_global_line_idx)) current_global_syl_idx += 1 current_x += element_width if is_active_line: active_syllable_end_idx_global = current_global_syl_idx active_syllable_indices = (active_syllable_start_idx_global, active_syllable_end_idx_global) current_y += line_height lines_drawn_count += 1 base_cp = cp.asarray(np.array(img_base)) mask_cp = cp.asarray(np.array(img_mask)) return base_cp, mask_cp, all_syllable_render_info, active_syllable_indices class SubtitleProcessor: def __init__(self, text_renderer: TextRenderer, config, syllable_dict, not_found_words_set): self.text_renderer = text_renderer self.config = config self.upper_case = config["upper_case"] self.font = self.text_renderer.font self.syllable_dict = syllable_dict self.not_found_words_set = not_found_words_set @staticmethod def _parse_time_string_float(time_str): """Parses time string (like '0.598') directly to float seconds.""" try: return float(time_str) except (ValueError, TypeError): print(f"Aviso: Timestamp em formato inesperado: {time_str}") return None @staticmethod def read_subtitles(file): """Lê legendas do arquivo PSV (CHARACTER|START|END).""" char_timing_data = [] try: with open(file, 'r', encoding='utf-8') as f: lines = f.readlines() if not lines: print(f"Aviso: Arquivo de legenda '{file}' está vazio.") return [], [] header = lines[0].strip().upper() start_line_index = 0 if header == "CHARACTER|START|END": start_line_index = 1 elif header and '|' not in lines[0]: print("Aviso: Cabeçalho 'CHARACTER|START|END' não encontrado. Assumindo que não há cabeçalho.") for line_num, line in enumerate(lines[start_line_index:], start=start_line_index + 1): if not line.strip(): pass line = line.rstrip('\n\r') if not line: continue parts = line.split('|') if len(parts) != 3: print(f"Aviso: Ignorando linha {line_num} mal formatada (esperava 3 colunas separadas por '|'): '{line}'") continue char = parts[0] start_str = parts[1].strip() end_str = parts[2].strip() start_time = SubtitleProcessor._parse_time_string_float(start_str) end_time = SubtitleProcessor._parse_time_string_float(end_str) if start_time is None or end_time is None: print(f"Aviso: Ignorando linha {line_num} com timestamp inválido: '{line}'") continue if not char and start_time is not None and end_time is not None: char = " " if end_time < start_time: print(f"Aviso: Corrigindo end_time < start_time na linha {line_num}: '{line}'") end_time = start_time char_timing_data.append((start_time, end_time, str(char))) except FileNotFoundError: print(f"Erro: Arquivo de legenda PSV não encontrado: {file}") return [], [] except Exception as e: print(f"Erro inesperado ao ler legendas PSV: {e}") import traceback traceback.print_exc() return [], [] char_timing_data.sort(key=lambda x: x[0]) long_pauses = SubtitleProcessor._identify_long_pauses(char_timing_data) return char_timing_data, long_pauses @staticmethod def _identify_long_pauses(char_timing_data, min_pause_duration=5.0): """Identifica pausas longas: no início, entre caracteres ou na duração de um caractere.""" pauses = [] if not char_timing_data: return pauses first_char_start_time = char_timing_data[0][0] initial_pause_duration = first_char_start_time if initial_pause_duration >= min_pause_duration: pauses.append({ "start": 0.0, "end": first_char_start_time, "duration": initial_pause_duration, "type": "initial" }) for i in range(1, len(char_timing_data)): prev_end_time = char_timing_data[i-1][1] curr_start_time = char_timing_data[i][0] pause_duration = curr_start_time - prev_end_time if pause_duration >= min_pause_duration: is_covered_by_initial = False if pauses and pauses[0]["type"] == "initial" and pauses[0]["end"] >= curr_start_time: is_covered_by_initial = True if not is_covered_by_initial: pauses.append({ "start": prev_end_time, "end": curr_start_time, "duration": pause_duration, "type": "between" }) for i in range(len(char_timing_data)): start_time = char_timing_data[i][0] end_time = char_timing_data[i][1] char_duration = end_time - start_time if char_duration >= min_pause_duration: is_covered = False for p in pauses: if abs(p["start"] - start_time) < 0.01 and abs(p["end"] - end_time) < 0.01: is_covered = True break if not is_covered: pauses.append({ "start": start_time, "end": end_time, "duration": char_duration, "type": "during" }) pauses.sort(key=lambda x: x["start"]) return pauses def _group_chars_into_words(self, char_timing_data): """Agrupa caracteres em palavras e espaços.""" words_and_spaces = [] current_word_chars = [] for i, (start, end, char) in enumerate(char_timing_data): processed_char = char.upper() if self.upper_case else char if processed_char.isspace(): if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) current_word_chars = [] words_and_spaces.append({"type": "space", "start": start, "end": end}) else: current_word_chars.append((start, end, processed_char)) if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) return words_and_spaces def _process_words_into_syllables(self, words_and_spaces): """Processa palavras para dividi-las em sílabas usando o dicionário.""" syllable_data = [] temp_draw = ImageDraw.Draw(Image.new("RGB", (1,1))) punctuation_to_strip = ",.!?;:" for element in words_and_spaces: if element["type"] == "space": space_width = self.text_renderer.space_width_ref syllable_data.append((element["start"], element["end"], " ", space_width)) continue word_chars = element["chars"] if not word_chars: continue word_text = "".join([c[2] for c in word_chars]) cleaned_word_text = word_text.rstrip(punctuation_to_strip) word_lookup = cleaned_word_text.lower() if word_lookup in self.syllable_dict: syllables_str = self.syllable_dict[word_lookup] syllable_parts = syllables_str.split('-') char_idx_counter = 0 original_char_idx_counter = 0 for syl_part in syllable_parts: syl_len = len(syl_part) if char_idx_counter + syl_len > len(cleaned_word_text): print(f"Aviso: Inconsistência de sílaba para '{cleaned_word_text}' (original: '{word_text}'). Tratando restante.") if original_char_idx_counter < len(word_chars): syl_chars_original = word_chars[original_char_idx_counter:] syl_text_original = "".join([c[2] for c in syl_chars_original]) syl_start = syl_chars_original[0][0] syl_end = syl_chars_original[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in syl_chars_original) syllable_data.append((syl_start, syl_end, syl_text_original, syl_width)) break syl_chars = word_chars[original_char_idx_counter : original_char_idx_counter + syl_len] if not syl_chars: continue syl_text = "".join([c[2] for c in syl_chars]) if syl_text.lower() != syl_part.lower(): print(f"Aviso: Discrepância na sílaba para '{cleaned_word_text}'. Esperado '{syl_part}', obtido '{syl_text}'. Usando texto dos caracteres originais.") pass syl_start = syl_chars[0][0] syl_end = syl_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in syl_chars) syllable_data.append((syl_start, syl_end, syl_text, syl_width)) char_idx_counter += syl_len original_char_idx_counter += syl_len if original_char_idx_counter < len(word_chars): remaining_chars = word_chars[original_char_idx_counter:] rem_text_check = "".join([c[2] for c in remaining_chars]) if rem_text_check == word_text[len(cleaned_word_text):]: if syllable_data and syllable_data[-1][2] != " ": last_syl_start, last_syl_end, last_syl_text, last_syl_width = syllable_data.pop() new_syl_text = last_syl_text + rem_text_check new_syl_end = remaining_chars[-1][1] new_syl_width = last_syl_width + sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in remaining_chars) syllable_data.append((last_syl_start, new_syl_end, new_syl_text, new_syl_width)) else: print(f"Aviso: Adicionando pontuação final '{rem_text_check}' como elemento separado.") rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text_check, rem_width)) else: print(f"Aviso: Caracteres restantes inesperados em '{word_text}' após processar sílabas de '{cleaned_word_text}'. Agrupando.") rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text_check, rem_width)) else: if word_lookup not in self.not_found_words_set: print(f"Aviso: Palavra '{cleaned_word_text}' (original: '{word_text}') não encontrada no dicionário de sílabas. Tratando como sílaba única.") self.not_found_words_set.add(word_lookup) syl_start = word_chars[0][0] syl_end = word_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], self.font) for c in word_chars) syllable_data.append((syl_start, syl_end, word_text, syl_width)) del temp_draw syllable_data.sort(key=lambda x: x[0]) return syllable_data def group_syllables_into_lines(self, syllable_timing_data, video_width): """Agrupa sílabas/espaços em linhas visuais.""" lines = [] if not syllable_timing_data: return lines max_width = int(video_width * 0.9) current_line_elements = [] current_line_width = 0 last_space_idx_in_line = -1 for idx, (start, end, text, element_width) in enumerate(syllable_timing_data): is_space = (text == " ") potential_line_width = current_line_width + element_width if current_line_elements and not current_line_elements[-1][2].isspace() and not is_space: potential_line_width += self.text_renderer.space_width_ref if potential_line_width > max_width and current_line_elements: break_point_idx = -1 for i in range(len(current_line_elements) - 1, -1, -1): if current_line_elements[i][2].isspace(): break_point_idx = i break if break_point_idx != -1: line_to_add = current_line_elements[:break_point_idx + 1] while line_to_add and line_to_add[-1][2].isspace(): line_to_add.pop() if line_to_add: lines.append(line_to_add) remaining_elements = current_line_elements[break_point_idx + 1:] current_line_elements = remaining_elements + [(start, end, text, element_width)] else: line_to_add = current_line_elements while line_to_add and line_to_add[-1][2].isspace(): line_to_add.pop() if line_to_add: lines.append(line_to_add) current_line_elements = [(start, end, text, element_width)] current_line_width = 0 for elem in current_line_elements: current_line_width += elem[3] else: current_line_elements.append((start, end, text, element_width)) current_line_width += element_width if current_line_elements: while current_line_elements and current_line_elements[-1][2].isspace(): current_line_elements.pop() if current_line_elements: lines.append(current_line_elements) return lines def process_subtitles_to_syllable_lines(self, file, video_width): char_timing_data, long_pauses = self.read_subtitles(file) if not char_timing_data: return [], [] words_and_spaces = self._group_chars_into_words(char_timing_data) syllable_timing_data = self._process_words_into_syllables(words_and_spaces) if not syllable_timing_data: print("Aviso: Nenhum dado de sílaba gerado.") return [], long_pauses lines = self.group_syllables_into_lines(syllable_timing_data, video_width) return lines, long_pauses class CUDAProcessor: def __init__(self, config, static_bg_rgb_cp): self.config = config self.static_bg_rgb_cp = static_bg_rgb_cp self.streams = [cp.cuda.Stream() for _ in range(2)] self.base_color_cp = hex_to_bgr_cupy(config["base_text_color"]) self.base_color_rgb_float_cp = self.base_color_cp[::-1].astype(cp.float32) / 255.0 highlight_hex = config["highlight_text_color"] highlight_bgr_cp = hex_to_bgr_cupy(highlight_hex) self.highlight_color_rgb_float_cp = highlight_bgr_cp[::-1].astype(cp.float32) / 255.0 self.progress_bar_fill_color_cp = highlight_bgr_cp self.progress_bar_fill_rgb_float_cp = self.highlight_color_rgb_float_cp hex_color_clean = highlight_hex.lstrip('#') rgb = tuple(int(hex_color_clean[i:i+2], 16) for i in (0, 2, 4)) darkening_factor = 0.4 dark_rgb = tuple(max(0, min(255, int(c * darkening_factor))) for c in rgb) self.progress_bar_bg_color_cp = cp.array(dark_rgb[::-1], dtype=cp.uint8) self.progress_bar_bg_rgb_float_cp = self.progress_bar_bg_color_cp[::-1].astype(cp.float32) / 255.0 self.frames_per_batch = config["frames_per_batch"] self.min_syl_duration = config.get("min_char_duration", 0.01) self.progress_bar_height = 20 self.progress_bar_y_start = 10 self.max_visual_fill_duration = config.get("max_visual_fill_duration", 3.0) if self.static_bg_rgb_cp is None: raise ValueError("CUDAProcessor requer um array static_bg_rgb_cp não nulo.") self.static_bg_rgb_float_cp = self.static_bg_rgb_cp.astype(cp.float32) / 255.0 def process_frames_streaming(self, base_cp, mask_cp, all_syllable_render_info, active_syllable_indices, video, video_lock, video_fps, current_frame, num_frames, width, height, pbar=None, prev_mask_cp=None, fade_start_time=None, fade_end_time=None, active_global_line_idx=-1, completed_global_line_indices=set(), long_pauses=None): """Processes frames including optional progress bar and consistent syllable fill.""" channels = 3 num_syls_total = len(all_syllable_render_info) syl_duration_for_normalization = cp.zeros(num_syls_total, dtype=cp.float32) next_syl_starts_cp_pre = cp.empty(num_syls_total, dtype=cp.float32) if num_syls_total > 0: syl_starts_cp = cp.asarray([info[0] for info in all_syllable_render_info], dtype=cp.float32) syl_ends_cp = cp.asarray([info[1] for info in all_syllable_render_info], dtype=cp.float32) syl_x_cp = cp.asarray([info[2] for info in all_syllable_render_info], dtype=cp.int32) syl_y_cp = cp.asarray([info[3] for info in all_syllable_render_info], dtype=cp.int32) syl_w_cp = cp.asarray([info[4] for info in all_syllable_render_info], dtype=cp.int32) syl_h_cp = cp.asarray([info[5] for info in all_syllable_render_info], dtype=cp.int32) syl_global_idx_cp = cp.asarray([info[6] for info in all_syllable_render_info], dtype=cp.int32) if num_syls_total > 1: next_syl_starts_cp_pre[:-1] = syl_starts_cp[1:] next_syl_starts_cp_pre[-1] = syl_ends_cp[-1] for syl_idx in range(num_syls_total): s_start = syl_starts_cp[syl_idx] raw_end_time = next_syl_starts_cp_pre[syl_idx] raw_duration = raw_end_time - s_start eff_duration = cp.maximum(raw_duration, self.min_syl_duration) syl_duration_for_normalization[syl_idx] = eff_duration if eff_duration > self.max_visual_fill_duration and long_pauses: syllable_overlaps = False for pause in long_pauses: pause_start, pause_end = pause["start"], pause["end"] if s_start < pause_end and raw_end_time > pause_start: syllable_overlaps = True break if syllable_overlaps: syl_duration_for_normalization[syl_idx] = self.max_visual_fill_duration del next_syl_starts_cp_pre else: syl_starts_cp = cp.empty(0, dtype=cp.float32) syl_ends_cp = cp.empty(0, dtype=cp.float32) syl_x_cp = cp.empty(0, dtype=cp.int32) syl_y_cp = cp.empty(0, dtype=cp.int32) syl_w_cp = cp.empty(0, dtype=cp.int32) syl_h_cp = cp.empty(0, dtype=cp.int32) syl_global_idx_cp = cp.empty(0, dtype=cp.int32) batch_size = min(self.frames_per_batch, 64) out_cp = [cp.empty((batch_size, height, width, channels), dtype=cp.uint8) for _ in range(2)] text_mask_bool = mask_cp > 128 text_mask_bool_exp = text_mask_bool[None, ..., None] y_coords, x_coords = cp.mgrid[:height, :width] bar_y_start = self.progress_bar_y_start bar_y_end = bar_y_start + self.progress_bar_height bar_bg_mask_full_area = (y_coords >= bar_y_start) & (y_coords < bar_y_end) result_queue = queue.Queue(maxsize=8) writer = threading.Thread(target=self._create_writer_thread(result_queue, video, video_lock, pbar), daemon=True) writer.start() max_visual_fill_duration = self.config.get("max_visual_fill_duration", 3.0) buffer_idx = 0 for batch_start in range(0, num_frames, batch_size): buffer_idx = (buffer_idx + 1) % 2 stream = self.streams[buffer_idx] batch_end = min(batch_start + batch_size, num_frames) batch_frames = batch_end - batch_start if batch_frames <= 0: continue out_batch_cp = out_cp[buffer_idx][:batch_frames] with stream: batch_frame_indices = cp.arange(batch_frames, dtype=cp.int32) absolute_frame_indices = current_frame + batch_start + batch_frame_indices frame_times_cp = absolute_frame_indices.astype(cp.float32) / video_fps intermediate_batch_float = cp.repeat(self.static_bg_rgb_float_cp[None, ...], batch_frames, axis=0) is_in_long_pause_batch_for_bar = cp.zeros(batch_frames, dtype=bool) if long_pauses: batch_needs_bar = cp.zeros(batch_frames, dtype=bool) batch_fill_width = cp.zeros(batch_frames, dtype=cp.int32) for pause in long_pauses: pause_start, pause_end, pause_duration = pause["start"], pause["end"], pause["duration"] is_in_this_pause_batch_indices = cp.where((frame_times_cp >= pause_start) & (frame_times_cp < pause_end))[0] if is_in_this_pause_batch_indices.size > 0: batch_needs_bar[is_in_this_pause_batch_indices] = True is_in_long_pause_batch_for_bar[is_in_this_pause_batch_indices] = True progress = (frame_times_cp[is_in_this_pause_batch_indices] - pause_start) / pause_duration fill_width = cp.clip((progress * width), 0, width).astype(cp.int32) batch_fill_width[is_in_this_pause_batch_indices] = fill_width if cp.any(batch_needs_bar): bar_bg_mask_batch = bar_bg_mask_full_area[None, :, :] & batch_needs_bar[:, None, None] x_coords_exp = x_coords[None, :, :] bar_fill_mask_batch = bar_bg_mask_batch & (x_coords_exp < batch_fill_width[:, None, None]) apply_bg_mask = bar_bg_mask_batch & (~bar_fill_mask_batch) intermediate_batch_float = cp.where( apply_bg_mask[..., None], self.progress_bar_bg_rgb_float_cp, intermediate_batch_float ) intermediate_batch_float = cp.where( bar_fill_mask_batch[..., None], self.progress_bar_fill_rgb_float_cp, intermediate_batch_float ) del bar_bg_mask_batch, bar_fill_mask_batch, apply_bg_mask, x_coords_exp del batch_needs_bar, batch_fill_width if prev_mask_cp is not None and fade_start_time is not None and fade_end_time is not None and fade_end_time > fade_start_time: prev_mask_float_exp = (prev_mask_cp > 128)[None, ..., None] time_in_fade = frame_times_cp - fade_start_time fade_duration = fade_end_time - fade_start_time fade_progress = cp.clip(time_in_fade / fade_duration, 0.0, 1.0) prev_alpha_batch = 1.0 - fade_progress prev_alpha_exp = prev_alpha_batch[:, None, None, None] intermediate_batch_float = cp.where( prev_mask_float_exp, self.base_color_rgb_float_cp * prev_alpha_exp + intermediate_batch_float * (1.0 - prev_alpha_exp), intermediate_batch_float ) batch_highlight_mask = cp.zeros((batch_frames, height, width), dtype=bool) completed_indices_cp = cp.asarray(list(completed_global_line_indices), dtype=cp.int32) if completed_indices_cp.size > 0 and num_syls_total > 0: is_completed_syl_mask = cp.isin(syl_global_idx_cp, completed_indices_cp) completed_syl_indices = cp.where(is_completed_syl_mask)[0] for syl_idx in completed_syl_indices: s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) combined_mask = syl_bbox_mask & text_mask_bool batch_highlight_mask[:, combined_mask] = True start_active_syl_idx, end_active_syl_idx = active_syllable_indices start_active_syl_idx = max(0, start_active_syl_idx) end_active_syl_idx = min(num_syls_total, end_active_syl_idx) if active_global_line_idx != -1 and start_active_syl_idx < end_active_syl_idx: for syl_idx in range(start_active_syl_idx, end_active_syl_idx): if syl_global_idx_cp[syl_idx] != active_global_line_idx: continue s_start = syl_starts_cp[syl_idx] s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue current_syl_norm_duration = syl_duration_for_normalization[syl_idx] elapsed_time = frame_times_cp - s_start safe_visual_duration = cp.maximum(current_syl_norm_duration, 1e-6) normalized_time = cp.clip(elapsed_time / safe_visual_duration, 0.0, 1.0) syl_progress_batch = normalized_time cutoff_x_batch = s_x + syl_progress_batch * s_w syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) highlight_area_mask_batch = x_coords[None, :, :] < cutoff_x_batch[:, None, None] active_highlight_apply_mask = syl_bbox_mask[None, :, :] & \ highlight_area_mask_batch & \ text_mask_bool[None, :, :] batch_highlight_mask |= active_highlight_apply_mask del elapsed_time, safe_visual_duration, normalized_time, cutoff_x_batch del syl_progress_batch, syl_bbox_mask del highlight_area_mask_batch, active_highlight_apply_mask final_color_batch_float = intermediate_batch_float highlight_mask_exp = batch_highlight_mask[..., None] final_color_batch_float = cp.where( text_mask_bool_exp & highlight_mask_exp, self.highlight_color_rgb_float_cp, final_color_batch_float ) final_color_batch_float = cp.where( text_mask_bool_exp & (~highlight_mask_exp), self.base_color_rgb_float_cp, final_color_batch_float ) out_batch_cp[:] = (final_color_batch_float * 255.0).astype(cp.uint8) del batch_highlight_mask, highlight_mask_exp del final_color_batch_float, intermediate_batch_float if 'completed_indices_cp' in locals(): del completed_indices_cp if 'is_completed_syl_mask' in locals(): del is_completed_syl_mask if 'completed_syl_indices' in locals(): del completed_syl_indices del is_in_long_pause_batch_for_bar stream.synchronize() result_queue.put((out_batch_cp.copy(), batch_frames)) cp.get_default_memory_pool().free_all_blocks() result_queue.put(None) writer.join() for s in self.streams: s.synchronize() if 'syl_starts_cp' in locals(): del syl_starts_cp if 'syl_ends_cp' in locals(): del syl_ends_cp if 'syl_x_cp' in locals(): del syl_x_cp if 'syl_y_cp' in locals(): del syl_y_cp if 'syl_w_cp' in locals(): del syl_w_cp if 'syl_h_cp' in locals(): del syl_h_cp if 'syl_global_idx_cp' in locals(): del syl_global_idx_cp if 'syl_duration_for_normalization' in locals(): del syl_duration_for_normalization del text_mask_bool, text_mask_bool_exp del y_coords, x_coords del out_cp, result_queue cp.get_default_memory_pool().free_all_blocks() del bar_bg_mask_full_area def _create_writer_thread(self, result_queue, video, video_lock, pbar): def writer_thread(): while True: batch_data = result_queue.get() if batch_data is None: result_queue.task_done() break frames_rgb_uint8, batch_frames_count = batch_data try: frames_bgr_uint8 = frames_rgb_uint8[:, :, :, ::-1] for frame_idx in range(batch_frames_count): with video_lock: video.write(cp.asnumpy(frames_bgr_uint8[frame_idx])) if pbar is not None: pbar.update(batch_frames_count) except Exception as e: print(f"Erro na thread escritora: {e}") if isinstance(e, (BrokenPipeError, OSError)): print("Erro crítico de escrita. Encerrando thread escritora.") break finally: del frames_rgb_uint8, frames_bgr_uint8 result_queue.task_done() return writer_thread class FFmpegWriter: def __init__(self, output_file, width, height, fps, config): self.output_file = output_file self.config = config ffmpeg_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "-", "-c:v", config["ffmpeg_codec"], "-preset", config["ffmpeg_preset"], "-b:v", config["ffmpeg_bitrate"], "-pix_fmt", "yuv420p", "-tune", config["ffmpeg_tune"], output_file ] self.ffmpeg_process = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=10**8 ) self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) self.stderr_thread.start() def _read_stderr(self): """Reads and prints FFmpeg's stderr output.""" try: for line in iter(self.ffmpeg_process.stderr.readline, b''): pass except Exception as e: print(f"Error reading FFmpeg stderr: {e}") def write(self, frame): """Writes a frame (NumPy/CuPy array) to FFmpeg.""" try: if isinstance(frame, cp.ndarray): frame = cp.asnumpy(frame) self.ffmpeg_process.stdin.write(frame.tobytes()) except (OSError, BrokenPipeError) as e: print(f"Error writing frame to FFmpeg: {e}. FFmpeg might have terminated.") self.release() raise def release(self): """Closes the FFmpeg process.""" if self.ffmpeg_process.stdin: try: self.ffmpeg_process.stdin.close() except OSError as e: print(f"Warning: Error closing FFmpeg stdin: {e}") ret_code = self.ffmpeg_process.wait() if ret_code != 0: print(f"Warning: FFmpeg process exited with non-zero status: {ret_code}") self.stderr_thread.join(timeout=1.0) def get_audio_duration(audio_file_path): if not os.path.exists(audio_file_path): print(f"Aviso: Arquivo de áudio não encontrado: {audio_file_path}") return None try: command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_file_path ] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) duration = float(result.stdout.strip()) return duration except FileNotFoundError: print("Erro: ffprobe não encontrado. Certifique-se de que o FFmpeg (que inclui ffprobe) está no PATH do sistema.") return None except subprocess.CalledProcessError as e: print(f"Erro ao executar ffprobe para obter duração do áudio: {e}") print(f"Stderr: {e.stderr}") return None except ValueError: print(f"Erro: Não foi possível converter a saída do ffprobe para float: {result.stdout.strip()}") return None except Exception as e: print(f"Erro inesperado ao obter duração do áudio: {e}") return None class KaraokeVideoCreator: def __init__(self, config, text_renderer: TextRenderer): self.config = config self.fps = config["video_fps"] self.text_renderer = text_renderer self.num_visible_lines = config["num_visible_lines"] width, height = 1920, 1080 try: width, height = map(int, self.config["video_resolution"].split("x")) except ValueError: print(f"Aviso: Resolução de vídeo inválida '{self.config['video_resolution']}'. Usando 1920x1080.") width, height = 1920, 1080 self.width = width self.height = height self.static_bg_frame_bgr_np = None self.static_bg_frame_rgb_cp = None bg_path = config.get("background_image") loaded_static = False if bg_path and os.path.exists(bg_path): try: bg_img = Image.open(bg_path).convert("RGB").resize((width, height), Image.Resampling.LANCZOS) self.static_bg_frame_bgr_np = np.array(bg_img)[:, :, ::-1].copy() self.static_bg_frame_rgb_cp = cp.asarray(np.array(bg_img)) loaded_static = True except Exception as e: print(f"Erro ao carregar imagem de fundo '{bg_path}': {e}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) else: if bg_path: print(f"Aviso: Imagem de fundo especificada não encontrada: {bg_path}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) self.init_gpu() self.cuda_processor = CUDAProcessor( config, static_bg_rgb_cp=self.static_bg_frame_rgb_cp ) def init_gpu(self): """Initializes the GPU device and memory pool.""" try: cp.cuda.Device(0).use() cp.cuda.set_allocator(cp.cuda.MemoryPool().malloc) cp.cuda.Stream.null.synchronize() _ = cp.zeros(1) except cp.cuda.runtime.CUDARuntimeError as e: print(f"Erro Crítico: Falha ao inicializar a GPU CUDA: {e}") raise except Exception as e: print(f"Erro Crítico: Falha inesperada durante inicialização da GPU: {e}") raise def _get_next_global_indices(self, current_displayed_indices, count=2): """Encontra os próximos 'count' índices globais que ainda não estão na tela.""" max_existing_idx = -1 valid_indices = {idx for idx in current_displayed_indices if idx is not None} if valid_indices: max_existing_idx = max(valid_indices) next_indices = [] candidate_idx = max_existing_idx + 1 while len(next_indices) < count: next_indices.append(candidate_idx) candidate_idx += 1 return next_indices def create_video(self, syllable_lines, long_pauses, output_file, audio_file_path): """Cria o vídeo de karaokê com atualização de conteúdo por slots.""" width, height = self.width, self.height N = self.num_visible_lines audio_duration = get_audio_duration(audio_file_path) first_syl_start_time = 0.0 last_syl_end_time = 0.0 if syllable_lines: first_valid_line_idx = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx != -1 and syllable_lines[first_valid_line_idx]: first_syl_start_time = syllable_lines[first_valid_line_idx][0][0] last_valid_line_idx = next((idx for idx in range(len(syllable_lines) - 1, -1, -1) if syllable_lines[idx]), -1) if last_valid_line_idx != -1 and syllable_lines[last_valid_line_idx]: last_syl_end_time = syllable_lines[last_valid_line_idx][-1][1] else: print("Aviso: Nenhuma linha de sílaba para processar.") last_syl_end_time = 1.0 video_end_time = last_syl_end_time + 0.5 if audio_duration is not None: video_end_time = max(video_end_time, audio_duration + 0.1) else: print(f"Aviso: Sem duração de áudio, vídeo terminará em {video_end_time:.2f}s baseado nas legendas.") total_frames = math.ceil(video_end_time * self.fps) print(f"Duração do vídeo estimada: {video_end_time:.2f}s, Total de frames: {total_frames}") try: video = FFmpegWriter(output_file, width, height, self.fps, self.config) except Exception as e: print(f"Erro Crítico: Falha ao inicializar FFmpegWriter: {e}") return video_lock = threading.Lock() start_time_processing = time.time() current_frame = 0 num_lines = len(syllable_lines) pbar = tqdm(total=total_frames, unit="frames", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} frames [{elapsed}<{remaining}, {rate_fmt}]", position=0, leave=True) displayed_content = [] for idx in range(N): line_data = syllable_lines[idx] if idx < num_lines else None displayed_content.append((idx if line_data else None, line_data)) completed_global_line_indices = set() prev_mask_cp = None trigger_1_pending_for_line = -1 trigger_2_pending = False last_trigger_1_line_completed = -1 last_trigger_2_line_completed = -1 initial_static_frames = 0 if syllable_lines: first_valid_line_idx_recalc = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx_recalc != -1: first_syl_start_time_recalc = syllable_lines[first_valid_line_idx_recalc][0][0] initial_static_frames = max(0, int(first_syl_start_time_recalc * self.fps)) if initial_static_frames > 0: try: if any(global_idx is not None for global_idx, _ in displayed_content): initial_base_cp, initial_mask_cp, all_syl_info, _ = self.text_renderer.render_text_images( displayed_content, -1, width, height ) if initial_base_cp is not None and initial_mask_cp is not None: self.cuda_processor.process_frames_streaming( initial_base_cp, initial_mask_cp, [], (-1,-1), video, video_lock, self.fps, 0, initial_static_frames, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) prev_mask_cp = initial_mask_cp.copy() del initial_base_cp, initial_mask_cp, all_syl_info else: print("Aviso: Renderização inicial produziu dados nulos. Usando fallback.") fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None else: fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None except Exception as e: print(f"Aviso: Falha ao pré-renderizar janela inicial estática: {e}") traceback.print_exc() fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None current_frame = initial_static_frames for i, current_line_syllables in enumerate(syllable_lines): if not current_line_syllables: print(f"Aviso: Pulando linha global vazia {i}") continue if trigger_2_pending and i != last_trigger_2_line_completed: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_bottom1 = next_indices[0] new_data_bottom1 = syllable_lines[new_idx_bottom1] if new_idx_bottom1 < num_lines else None displayed_content[N-2] = (new_idx_bottom1 if new_data_bottom1 else None, new_data_bottom1) new_idx_bottom2 = next_indices[1] new_data_bottom2 = syllable_lines[new_idx_bottom2] if new_idx_bottom2 < num_lines else None displayed_content[N-1] = (new_idx_bottom2 if new_data_bottom2 else None, new_data_bottom2) trigger_2_pending = False last_trigger_2_line_completed = i active_local_idx = -1 for local_idx, (global_idx, _) in enumerate(displayed_content): if global_idx == i: active_local_idx = local_idx break if active_local_idx == -1: line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) frames_to_fill_until_line = max(0, line_start_frame - current_frame) if frames_to_fill_until_line > 0: print(f"Aviso: Linha ativa {i} não encontrada na tela. Preenchendo {frames_to_fill_until_line} frames...") if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, frames_to_fill_until_line, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (frames_to_fill_until_line, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(frames_to_fill_until_line) current_frame += frames_to_fill_until_line continue render_data = None render_success = False try: render_data = self.text_renderer.render_text_images(displayed_content, active_local_idx, width, height) render_success = True except Exception as e: print(f"Erro crítico ao renderizar slots para linha {i}: {e}") traceback.print_exc() render_success = False if render_success and render_data: base_cp, mask_cp, all_syl_info, active_indices = render_data if base_cp is not None and mask_cp is not None and all_syl_info and active_indices[1] > active_indices[0]: line_start_time = current_line_syllables[0][0] line_end_time = current_line_syllables[-1][1] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_process = max(0, processing_end_frame - effective_start_frame_for_processing) trigger_1_frame = -1 is_penultimate_line = (active_local_idx == N - 2) if is_penultimate_line and i != last_trigger_1_line_completed: line_duration = max(line_end_time - line_start_time, 0.01) midpoint_time = line_start_time + line_duration / 2.0 trigger_1_frame = int(midpoint_time * self.fps) trigger_1_pending_for_line = i if num_frames_to_process > 0: fade_line_duration = 0.15 line_fade_start_time = line_start_time line_fade_end_time = line_start_time + fade_line_duration self.cuda_processor.process_frames_streaming( base_cp, mask_cp, all_syl_info, active_indices, video, video_lock, self.fps, effective_start_frame_for_processing, num_frames_to_process, width, height, pbar, prev_mask_cp, line_fade_start_time, line_fade_end_time, active_global_line_idx=i, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) processed_frames_end = effective_start_frame_for_processing + num_frames_to_process if trigger_1_pending_for_line == i and trigger_1_frame != -1 and processed_frames_end > trigger_1_frame: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_top1 = next_indices[0] new_data_top1 = syllable_lines[new_idx_top1] if new_idx_top1 < num_lines else None displayed_content[0] = (new_idx_top1 if new_data_top1 else None, new_data_top1) new_idx_top2 = next_indices[1] new_data_top2 = syllable_lines[new_idx_top2] if new_idx_top2 < num_lines else None displayed_content[1] = (new_idx_top2 if new_data_top2 else None, new_data_top2) trigger_1_pending_for_line = -1 last_trigger_1_line_completed = i current_frame = processed_frames_end if prev_mask_cp is not None: del prev_mask_cp prev_mask_cp = mask_cp.copy() if mask_cp is not None else None completed_global_line_indices.add(i) else: if i not in completed_global_line_indices: completed_global_line_indices.add(i) is_last_line = (active_local_idx == N - 1) if is_last_line: trigger_2_pending = True del base_cp, all_syl_info else: print(f"Aviso: Renderização para linha {i} inválida. Pulando GPU e preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill if render_data: try: del render_data[0], render_data[1], render_data[2], render_data[3] except: pass else: print(f"Aviso: Falha GERAL ao renderizar slots para linha {i}. Preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill cp.get_default_memory_pool().free_all_blocks() final_frames_to_fill = total_frames - current_frame if final_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) final_fade_start_time = (current_frame / self.fps) final_fade_duration = 0.5 final_fade_end_time = final_fade_start_time + final_fade_duration self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, final_frames_to_fill, width, height, pbar, prev_mask_cp, final_fade_start_time, final_fade_end_time, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (final_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(final_frames_to_fill) del fill_batch current_frame += final_frames_to_fill if pbar is not None: if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) pbar.close() video.release() if prev_mask_cp is not None: del prev_mask_cp del displayed_content[:] def load_syllables(filepath="syllables.txt"): """Carrega o dicionário de sílabas do arquivo.""" syllable_dict = {} not_found_words = set() try: with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or '|' not in line: continue parts = line.split('|', 1) if len(parts) == 2: word = parts[0].strip().lower() syllables = parts[1].strip() if word and syllables: syllable_dict[word] = syllables except FileNotFoundError: print(f"Aviso: Arquivo de sílabas '{filepath}' não encontrado. O destaque será por palavra.") except Exception as e: print(f"Erro ao carregar arquivo de sílabas '{filepath}': {e}") return syllable_dict, not_found_words def main(): start_main_time = time.time() config = DEFAULT_CONFIG.copy() try: device = cp.cuda.Device(0) device.use() except cp.cuda.runtime.CUDARuntimeError as e: if 'no CUDA-capable device is detected' in str(e): print("Erro Crítico: Nenhuma GPU compatível com CUDA foi detectada.") elif 'CUDA driver version is insufficient' in str(e): print("Erro Crítico: O driver NVIDIA CUDA está desatualizado. Atualize seus drivers.") else: print(f"Erro Crítico: Falha ao inicializar CUDA: {e}") print("O script não pode continuar sem CUDA. Verifique sua instalação NVIDIA e CUDA.") return except Exception as e: print(f"Erro inesperado durante a inicialização da GPU: {e}") return try: process = psutil.Process() cpu_count = os.cpu_count() affinity = list(range(cpu_count)) process.cpu_affinity(affinity) except (ImportError, AttributeError, OSError, ValueError) as e: print(f"Aviso: Não foi possível definir a afinidade da CPU (geralmente não é um problema): {e}") pass subtitle_file = config.get("default_subtitle_file", "legenda.psv") output_file = config.get("default_output_file", "video_karaoke_char_level.mp4") audio_file = "audio.wav" if not os.path.exists(subtitle_file): print(f"Erro Crítico: Arquivo de legenda '{subtitle_file}' não encontrado.") return if not os.path.exists(audio_file): print(f"Aviso: Arquivo de áudio '{audio_file}' não encontrado. A duração do vídeo será baseada apenas nas legendas.") try: text_renderer = TextRenderer(config) syllable_dict, not_found_words_set = load_syllables() subtitle_processor = SubtitleProcessor(text_renderer, config, syllable_dict, not_found_words_set) except Exception as e: print(f"Erro Crítico: Falha ao inicializar TextRenderer ou SubtitleProcessor: {e}") traceback.print_exc() return lines = [] long_pauses = [] video_width, _ = map(int, config["video_resolution"].split("x")) try: lines, long_pauses = subtitle_processor.process_subtitles_to_syllable_lines(subtitle_file, video_width) if not lines: if long_pauses: print("Aviso: Nenhuma linha visual foi criada, mas pausas longas foram detectadas.") else: print("Aviso: Nenhuma linha visual foi criada e nenhuma pausa longa detectada.") if long_pauses: print(f"Detectadas {len(long_pauses)} pausas longas (>= 5s).") except Exception as e: print(f"Erro Crítico: Falha ao processar legendas ou identificar pausas: {e}") traceback.print_exc() return if not lines and not long_pauses: print("Nenhuma linha visual ou pausa longa encontrada. Não é possível gerar o vídeo.") return try: karaoke_creator = KaraokeVideoCreator(config, text_renderer) karaoke_creator.create_video(lines, long_pauses, output_file, audio_file) except Exception as e: print(f"\nErro Crítico durante a criação do vídeo:") print(f"{e}") traceback.print_exc() print("\nO processo foi interrompido devido a um erro.") finally: cp.get_default_memory_pool().free_all_blocks() end_main_time = time.time() print(f"Finalizando Karaokê Creator. Tempo total do script: {time.strftime('%H:%M:%S', time.gmtime(end_main_time - start_main_time))}") if __name__ == "__main__": main()

Pensamento

Considering sentence grouping

The user wants to adjust their karaoke renderer to group the subtitle text into complete sentences instead of 4 lines. Sentences, defined by an uppercase letter at the start and punctuation at the end, will replace the current grouping. The timing synchronization needs to stay intact, so my focus will be on modifying the grouping logic while respecting the existing structure. I'll need to adjust the syllable grouping approach used in group_syllables_into_lines to accommodate full sentences instead of lines.

Adjusting sentence grouping

To address the user's request, I need to focus on detecting sentence boundaries: periods, exclamation marks, and question marks. We’ll currently accumulate syllables to form lines, but instead of grouping those lines based on width, I'll accumulate until I hit full sentences. Since they want to show 4 complete sentences, the display window will show a variable number of lines, but always 4 sentences. The synchronization of syllables will remain exactly the same — the critical change is how I group and display the text.

Guiding code modifications

To help the user transition to displaying 4 complete sentences, I'll suggest steps to modify their code. Instead of grouping syllables into lines, we can group them by sentence boundaries (indicated by punctuation). We'll need a function to check if we’re at the end of a sentence and adjust the line-breaking logic accordingly. We can also shift the create_video function to work with sentences, handling the visual output accordingly. This can be done with minimal changes by updating how we handle sentence breaks and line counts.

Thought for 40 seconds

Resposta

Segue um “patch” mínimo-viável que transforma a janela de 4 linhas → 4 frases completas, mantendo TODA a lógica de time-code, sílaba, highlight, animação e disparos exatamente como está.


1 . Atualize a configuração

python
DEFAULT_CONFIG = { ... # em vez de “linhas visíveis” vamos tratar como “frases visíveis” "num_visible_sentences": 4, ... }

Nada mais no código precisa desse nome antigo, portanto trocamos onde ele era lido (dois lugares: em TextRenderer.__init__ e em KaraokeVideoCreator.__init__).

python
self.num_visible_sentences = config["num_visible_sentences"]

2 . Quebrar em sentenças (ponto final, ?, !)

Crie um helper simples:

python
def _is_sentence_terminator(text: str) -> bool: # “e.”, “fim!”, “acabou?” etc. return text and text[-1] in ".!?"

2.a Dentro de SubtitleProcessor

Substitua o antigo group_syllables_into_lines por:

python
def group_syllables_into_sentences(self, syllable_timing_data, video_width): """ Devolve uma lista de SENTENÇAS. Cada sentença é uma lista de elementos (start,end,text,width), já respeitando quebra de linha por largura, MAS nunca mistura partes de sentenças diferentes. """ sentences = [] # lista de sentenças current_sentence = [] # acumula linhas da sentença atual current_line = [] # acumula elementos da linha atual current_line_w = 0 max_w = int(video_width * 0.9) for start, end, text, w in syllable_timing_data: # ——— Empacotamento horizontal ——— need_space = (current_line and # há algo not current_line[-1][2].isspace() and # último não é espaço not text.isspace()) # atual não é espaço extra = self.text_renderer.space_width_ref if need_space else 0 if current_line_w + w + extra > max_w and current_line: # fecha a linha atual while current_line and current_line[-1][2].isspace(): current_line.pop() if current_line: current_sentence.append(current_line) current_line = [] current_line_w = 0 # adiciona elemento current_line.append((start, end, text, w)) current_line_w += w + extra # ——— Detecta fim de sentença ——— if _is_sentence_terminator(text): # fecha linha se restou alguma coisa while current_line and current_line[-1][2].isspace(): current_line.pop() if current_line: current_sentence.append(current_line) # salva sentença if current_sentence: sentences.extend(current_sentence) # cada linha continua sendo uma “linha visual” # reseta current_sentence = [] current_line = [] current_line_w = 0 # flush final while current_line and current_line[-1][2].isspace(): current_line.pop() if current_line: current_sentence.append(current_line) if current_sentence: sentences.extend(current_sentence) return sentences

Por quê?
Cada sentença continua podendo ocupar mais de uma “linha visual” (dependendo do comprimento), mas todas as linhas que pertencem à mesma frase ficam juntas e, mais tarde, deslizam sem se misturar com a próxima.

2.b Ajuste a chamada

No process_subtitles_to_syllable_lines troque:

python
lines = self.group_syllables_into_lines(syllable_timing_data, video_width)

por

python
lines = self.group_syllables_into_sentences(syllable_timing_data, video_width)

O restante da pipeline continua idêntico, pois para o código já existente cada “linha visual” é indexada sequencialmente – agora elas vêm agrupadas por sentença, então a janela de 4 frases completas aparece naturalmente.


3 . Janela deslizante agora usa 4 frases

Em KaraokeVideoCreator.__init__ e em qualquer ponto onde se lê
self.num_visible_lines → substitua por self.num_visible_sentences.

Todo o mecanismo de scroll (trigger-1 e trigger-2) já trabalha com o índice global das “linhas visuais”; como agora só geramos novas linhas quando fecha uma frase, o comportamento que o usuário percebe é: 4 frases por vez na tela.


4 . Silenciar o aviso de “sílaba não encontrada” por causa do ponto

Dentro de _process_words_into_syllables, logo antes de:

python
print(f"Aviso: Palavra '{cleaned_word_text}' (original: '{word_text}') não encontrada ...)

adicione:

python
# ignorar ponteiro se o único motivo é o ponto final if cleaned_word_text.endswith('.') and word_lookup.rstrip('.') in self.syllable_dict: continue

Ou simplesmente comente (pass) esse print.


5 . (Opcional) Remova parâmetro obsoleto

Se desejar, elimine num_visible_lines do DEFAULT_CONFIG pra evitar ambigüidade.


Teste rápido

  1. Gere o vídeo novamente.
  2. Repare que, mesmo quando uma frase longa quebra em 2 linhas visuais, o slot de 4 frases só desliza depois que aparece o ponto final da 4ª frase.
  3. Avisos sobre “e.” deixam de aparecer.

Isso é tudo: apenas ~60 linhas alteradas, nenhum impacto na renderização GPU, no FFmpeg ou no gerador de frames.

Compartilhar esta Q&A