Preciso que faça uma correção, atualmente quando o...

Criado em: 24 de abril de 2025

Respondido usando O3 por Chat01

Pergunta

Preciso que faça uma correção, atualmente quando o gradiente chega na ultima silaba da palavra que tem . final (e. - a. - a.):

É|12.477|12.742
|12.742|12.783
h|12.783|12.804
o|12.804|13.130
j|13.130|13.151
e|13.151|13.192
|13.192|13.273
q|13.273|13.294
u|13.294|13.314
e|13.314|13.335
|13.335|13.396
a|13.396|13.416
|13.416|13.518
t|13.518|13.539
e|13.539|13.559
r|13.559|13.682
r|13.682|13.702
a|13.702|13.723
|13.723|13.845
t|13.845|13.866
r|13.866|13.886
e|13.886|14.376
m|14.376|14.397
e.|14.397|14.437
|14.437|14.977
É|14.977|15.773
|15.773|15.794
h|15.794|15.814
o|15.814|15.835
j|15.835|15.855
e|15.855|15.876
|15.876|15.978
q|15.978|15.998
u|15.998|16.018
e|16.018|16.059
|16.059|16.080
a|16.080|16.121
|16.121|16.202
p|16.202|16.243
e|16.243|16.366
d|16.366|16.386
r|16.386|16.427
a|16.427|16.447
|16.447|16.549
r|16.549|16.570
o|16.570|16.876
l|16.876|16.897
a.|16.897|16.937
|16.937|17.717
E|17.717|18.166
s|18.166|18.186
t|18.186|18.227
e|18.227|18.247
|18.247|18.369
é|18.369|18.390
|18.390|18.410
o|18.410|18.431
|18.431|18.451
s|18.451|18.471
o|18.471|18.492
m|18.492|18.696
|18.696|18.716
d|18.716|18.736
a|18.736|18.777
|18.777|18.940
m|18.940|18.961
i|18.961|19.165
n|19.165|19.185
h|19.185|19.205
a|19.205|19.246
|19.246|19.409
t|19.409|19.450
e|19.450|19.470
r|19.470|19.776
r|19.776|19.797
a.|19.797|19.837
|19.837|19.977
C|19.977|20.078
h|20.078|20.099
e|20.099|20.281
g|20.281|20.301
u|20.301|20.322
e|20.322|20.342
i|20.342|20.524
|20.524|20.605
n|20.605|20.646
o|20.646|20.666
|20.666|20.909
s|20.909|20.930
o|20.930|20.950
m|20.950|21.436
|21.436|21.456
d|21.456|21.477
a|21.477|21.781
|21.781|21.801
v|21.801|21.821
i|21.821|22.064
o|22.064|22.976
l|22.976|22.997
a.|22.997|23.037
|23.037|28.444
N|28.444|28.668
ã|28.668|28.688
o|28.688|28.749
...

O gradiente meio que da um "pulo" brusco e não avança naturalmente nessa silaba até que chegue a próxima frase/palavra; corrija isso.

  • Pense passo a passo com calma e mande um patch corrigindo isso de uma vez por todas
python
import cupy as cp import numpy as np from PIL import Image, ImageDraw, ImageFont import time import threading import math import os import queue import subprocess from tqdm import tqdm import psutil import traceback from cupyx.scipy import ndimage as cupy_ndimage DEFAULT_CONFIG = { "font_path": "C:/Users/lucas/LilitaOne-Regular.ttf", "font_size": 140, "video_resolution": "1920x1080", "video_fps": 6, "base_text_color": "#FFFFFF", "highlight_text_color": "#ff0000", "num_visible_lines": 4, "upper_case": True, "background_image": "capa.png", "frames_per_batch": 16, "default_subtitle_file": "legenda.psv", "default_output_file": "video_karaoke_char_level.mp4", "ffmpeg_preset": "p4", "ffmpeg_tune": "hq", "ffmpeg_bitrate": "20M", "ffmpeg_codec": "h264_nvenc", "vertical_shift_pixels": 130, "min_char_duration": 0.01, } def hex_to_bgr_cupy(hex_color): hex_color = hex_color.lstrip('#') rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) return cp.array(rgb[::-1], dtype=cp.uint8) class TextRenderer: def __init__(self, config): self.config = config self.font_path = config["font_path"] self.font_size = config["font_size"] self.num_visible_lines = config["num_visible_lines"] self.upper_case = config["upper_case"] self.base_text_color = config["base_text_color"] self._font_cache = {} try: self.font = ImageFont.truetype(self.font_path, self.font_size) self._font_cache[self.font_size] = self.font temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) space_bbox = temp_draw.textbbox((0, 0), " ", font=self.font) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = space_bbox[2] - space_bbox[0] if space_bbox else int(self.font_size * 0.25) try: sample_bbox = self.font.getbbox("Tg") self.line_height_ref = sample_bbox[3] - sample_bbox[1] except AttributeError: sample_bbox_fallback = temp_draw.textbbox((0, 0), "Tg", font=self.font) self.line_height_ref = sample_bbox_fallback[3] - sample_bbox_fallback[1] if sample_bbox_fallback else int(self.font_size * 1.2) del temp_draw, temp_img except Exception as e: print(f"Aviso: Falha ao carregar a fonte '{self.font_path}'. Usando fonte padrão. Erro: {e}") self.font = ImageFont.load_default() try: bbox = self.font.getbbox("M") self.font_size = bbox[3] - bbox[1] except AttributeError: self.font_size = 20 self._font_cache[self.font_size] = self.font temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = 10 try: bbox = self.font.getbbox("Tg") self.line_height_ref = bbox[3] - bbox[1] except AttributeError: self.line_height_ref = 30 del temp_draw, temp_img if self.num_visible_lines <= 1: spacing_multiplier = 1.0 elif self.num_visible_lines == 2: spacing_multiplier = 0.8 elif self.num_visible_lines == 3: spacing_multiplier = 0.6 else: spacing_multiplier = 0.4 self.line_spacing = int(self.line_height_ref * spacing_multiplier) self.line_spacing = max(0, self.line_spacing) def _get_font_with_size(self, size: int) -> ImageFont.FreeTypeFont: """Retorna uma ImageFont em determinado tamanho, usando cache.""" size = max(1, int(size)) if size in self._font_cache: return self._font_cache[size] try: f = ImageFont.truetype(self.font_path, size) except Exception as e: f = ImageFont.load_default() self._font_cache[size] = f return f def _calculate_line_width(self, line_elements, draw, font) -> int: """Calcula a largura total (px) de uma linha dada a fonte.""" width_total = 0 for _, _, txt, _ in line_elements: width_total += self._get_element_width(draw, txt, font) return width_total def _get_element_width(self, draw, text, font): """Obtém a largura de um elemento de texto (sílaba ou espaço).""" if text == " ": return self.space_width_ref try: return draw.textlength(text, font=font) except AttributeError: try: bbox = draw.textbbox((0, 0), text, font=font) return bbox[2] - bbox[0] if bbox else 0 except AttributeError: try: width, _ = draw.textsize(text, font=font) return width except AttributeError: font_size_est = getattr(font, 'size', self.font_size // 2) return len(text) * (font_size_est // 2) except Exception as e: font_size_est = getattr(font, 'size', self.font_size // 2) return len(text) * (font_size_est // 2) def render_text_images(self, displayed_content, active_line_local_idx, width, height): """ Renderiza cada frase/linha, reduzindo automaticamente o tamanho da fonte se a frase extrapolar 90 % da largura do vídeo (sem quebrar linha). Garante que a posição vertical das linhas seja fixa. """ img_base = Image.new("RGB", (width, height), (0, 0, 0)) img_mask = Image.new("L", (width, height), 0) draw_base = ImageDraw.Draw(img_base) draw_mask = ImageDraw.Draw(img_mask) max_allowed_width = int(width * 0.90) min_font_size = max(10, int(self.font_size * 0.60)) line_render_data = [] for global_idx, line_elements in displayed_content: if not (line_elements and global_idx is not None): line_render_data.append(None) continue font_line_size = self.font_size font_line = self._get_font_with_size(font_line_size) line_width_px = self._calculate_line_width(line_elements, draw_base, font_line) reduction_step = max(1, int(self.font_size * 0.05)) while line_width_px > max_allowed_width and font_line_size > min_font_size: font_line_size -= reduction_step font_line_size = max(min_font_size, font_line_size) font_line = self._get_font_with_size(font_line_size) line_width_px = self._calculate_line_width(line_elements, draw_base, font_line) if font_line_size == min_font_size: break try: h_ref = font_line.getbbox("Tg") line_height_px = h_ref[3] - h_ref[1] except Exception: line_height_px = int(self.line_height_ref * (font_line_size / self.font_size)) line_render_data.append({ "font": font_line, "font_size": font_line_size, "height": line_height_px, "width": line_width_px, "elements": line_elements, "global_idx": global_idx }) vertical_shift = self.config.get("vertical_shift_pixels", 0) block_height_ref = self.num_visible_lines * self.line_height_ref + (self.num_visible_lines - 1) * self.line_spacing start_y_ref = max(0, (height - block_height_ref) // 2 + vertical_shift) line_start_y_positions = [ int(start_y_ref + i * (self.line_height_ref + self.line_spacing)) for i in range(self.num_visible_lines) ] all_syllable_render_info = [] active_syllable_indices = (-1, -1) current_global_syl_idx = 0 for local_idx, render_info in enumerate(line_render_data): if render_info is None: continue font_line = render_info["font"] line_width_px = render_info["width"] elements_in_line = render_info["elements"] current_global_line_idx = render_info["global_idx"] is_active_line = (local_idx == active_line_local_idx) if is_active_line: active_syllable_start_idx_global = current_global_syl_idx line_start_x = (width - line_width_px) // 2 current_x = float(line_start_x) line_y_draw = line_start_y_positions[local_idx] if line_y_draw is None: print(f"Aviso: Posição Y não calculada para slot {local_idx}, pulando desenho.") continue for i, (start_time, end_time, element_text, _) in enumerate(elements_in_line): element_width = self._get_element_width(draw_base, element_text, font_line) if not element_text.isspace(): try: draw_x = int(current_x) draw_y = line_y_draw element_text_to_draw = element_text if element_text_to_draw.endswith('.') and len(element_text_to_draw.strip()) > 1: element_text_to_draw = element_text_to_draw.rstrip('.') draw_base.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=self.base_text_color) draw_mask.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=255) final_bbox = draw_base.textbbox((draw_x, draw_y), element_text, font=font_line) if final_bbox: bbox_left, bbox_top, bbox_right, bbox_bottom = final_bbox syl_w_actual = bbox_right - bbox_left syl_h_actual = bbox_bottom - bbox_top bbox_top_final = bbox_top else: line_height_px_fallback = render_info["height"] bbox_left = draw_x bbox_top_final = draw_y syl_w_actual = element_width syl_h_actual = line_height_px_fallback except Exception as e: print(f"Fallback de renderização/bbox para: {element_text}. Erro: {e}") draw_x = int(current_x) draw_y = line_y_draw try: draw_base.text((draw_x, draw_y), element_text, font=font_line, fill=self.base_text_color) draw_mask.text((draw_x, draw_y), element_text, font=font_line, fill=255) except Exception as draw_err: print(f" -> Falha até no desenho fallback: {draw_err}") line_height_px_fallback = render_info["height"] bbox_left = draw_x bbox_top_final = draw_y syl_w_actual = element_width syl_h_actual = line_height_px_fallback all_syllable_render_info.append( (start_time, end_time, bbox_left, bbox_top_final, syl_w_actual, syl_h_actual, current_global_line_idx) ) current_global_syl_idx += 1 current_x += element_width if is_active_line: active_syllable_end_idx_global = current_global_syl_idx active_syllable_indices = (active_syllable_start_idx_global, active_syllable_end_idx_global) base_cp = cp.asarray(np.array(img_base)) mask_cp = cp.asarray(np.array(img_mask)) return base_cp, mask_cp, all_syllable_render_info, active_syllable_indices class SubtitleProcessor: def __init__(self, text_renderer: TextRenderer, config, syllable_dict, not_found_words_set): self.text_renderer = text_renderer self.config = config self.upper_case = config["upper_case"] self.font = self.text_renderer.font self.syllable_dict = syllable_dict self.not_found_words_set = not_found_words_set @staticmethod def _parse_time_string_float(time_str): """Parses time string (like '0.598') directly to float seconds.""" try: return float(time_str) except (ValueError, TypeError): print(f"Aviso: Timestamp em formato inesperado: {time_str}") return None @staticmethod def read_subtitles(file): """Lê legendas do arquivo PSV (CHARACTER|START|END).""" char_timing_data = [] try: with open(file, 'r', encoding='utf-8') as f: lines = f.readlines() if not lines: print(f"Aviso: Arquivo de legenda '{file}' está vazio.") return [], [] header = lines[0].strip().upper() start_line_index = 0 if header == "CHARACTER|START|END": start_line_index = 1 elif header and '|' not in lines[0]: print("Aviso: Cabeçalho 'CHARACTER|START|END' não encontrado. Assumindo que não há cabeçalho.") for line_num, line in enumerate(lines[start_line_index:], start=start_line_index + 1): if not line.strip(): continue line = line.rstrip('\n\r') if not line: continue parts = line.split('|') if len(parts) != 3: print(f"Aviso: Ignorando linha {line_num} mal formatada (esperava 3 colunas separadas por '|'): '{line}'") continue char = parts[0] start_str = parts[1].strip() end_str = parts[2].strip() start_time = SubtitleProcessor._parse_time_string_float(start_str) end_time = SubtitleProcessor._parse_time_string_float(end_str) if start_time is None or end_time is None: print(f"Aviso: Ignorando linha {line_num} com timestamp inválido: '{line}'") continue if not char and start_time is not None and end_time is not None: char = " " if end_time < start_time: print(f"Aviso: Corrigindo end_time < start_time na linha {line_num}: '{line}'") end_time = start_time char_timing_data.append((start_time, end_time, str(char))) except FileNotFoundError: print(f"Erro: Arquivo de legenda PSV não encontrado: {file}") return [], [] except Exception as e: print(f"Erro inesperado ao ler legendas PSV: {e}") import traceback traceback.print_exc() return [], [] char_timing_data.sort(key=lambda x: x[0]) long_pauses = SubtitleProcessor._identify_long_pauses(char_timing_data) return char_timing_data, long_pauses @staticmethod def _identify_long_pauses(char_timing_data, min_pause_duration=5.0): """Identifica pausas longas: no início, entre caracteres ou na duração de um caractere.""" pauses = [] if not char_timing_data: return pauses first_char_start_time = char_timing_data[0][0] initial_pause_duration = first_char_start_time if initial_pause_duration >= min_pause_duration: pauses.append({ "start": 0.0, "end": first_char_start_time, "duration": initial_pause_duration, "type": "initial" }) for i in range(1, len(char_timing_data)): prev_end_time = char_timing_data[i-1][1] curr_start_time = char_timing_data[i][0] pause_duration = curr_start_time - prev_end_time if pause_duration >= min_pause_duration: is_covered_by_initial = False if pauses and pauses[0]["type"] == "initial" and pauses[0]["end"] >= curr_start_time: is_covered_by_initial = True if not is_covered_by_initial: pauses.append({ "start": prev_end_time, "end": curr_start_time, "duration": pause_duration, "type": "between" }) for i in range(len(char_timing_data)): start_time = char_timing_data[i][0] end_time = char_timing_data[i][1] char_duration = end_time - start_time if char_duration >= min_pause_duration: is_covered = False for p in pauses: if abs(p["start"] - start_time) < 0.01 and abs(p["end"] - end_time) < 0.01: is_covered = True break if not is_covered: pauses.append({ "start": start_time, "end": end_time, "duration": char_duration, "type": "during" }) pauses.sort(key=lambda x: x["start"]) return pauses def _group_chars_into_words(self, char_timing_data): """Agrupa caracteres em palavras e espaços.""" words_and_spaces = [] current_word_chars = [] for i, (start, end, char) in enumerate(char_timing_data): processed_char = char.upper() if self.upper_case else char if processed_char.isspace(): if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) current_word_chars = [] words_and_spaces.append({"type": "space", "start": start, "end": end}) else: current_word_chars.append((start, end, processed_char)) if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) return words_and_spaces def _process_words_into_syllables(self, words_and_spaces): """Processa palavras para dividi-las em sílabas usando o dicionário.""" syllable_data = [] temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) default_font = self.text_renderer.font punctuation_to_strip = ",.!?;:" for element in words_and_spaces: if element["type"] == "space": space_width = self.text_renderer.space_width_ref syllable_data.append((element["start"], element["end"], " ", space_width)) continue word_chars = element["chars"] if not word_chars: continue word_text = "".join([c[2] for c in word_chars]) cleaned_word_text = word_text.rstrip(punctuation_to_strip) word_lookup = cleaned_word_text.lower() if word_lookup in self.syllable_dict: syllables_str = self.syllable_dict[word_lookup] syllable_parts = syllables_str.split('-') char_idx_counter = 0 original_char_idx_counter = 0 for syl_part in syllable_parts: syl_len = len(syl_part) if char_idx_counter + syl_len > len(cleaned_word_text): if original_char_idx_counter < len(word_chars): syl_chars_original = word_chars[original_char_idx_counter:] syl_text_original = "".join([c[2] for c in syl_chars_original]) syl_start = syl_chars_original[0][0] syl_end = syl_chars_original[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in syl_chars_original) syllable_data.append((syl_start, syl_end, syl_text_original, syl_width)) break syl_chars = word_chars[original_char_idx_counter : original_char_idx_counter + syl_len] if not syl_chars: continue syl_text = "".join([c[2] for c in syl_chars]) syl_start = syl_chars[0][0] syl_end = syl_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in syl_chars) syllable_data.append((syl_start, syl_end, syl_text, syl_width)) char_idx_counter += syl_len original_char_idx_counter += syl_len if original_char_idx_counter < len(word_chars): remaining_chars = word_chars[original_char_idx_counter:] rem_text = "".join([c[2] for c in remaining_chars]) expected_punctuation = word_text[len(cleaned_word_text):] if rem_text == expected_punctuation: if syllable_data and not syllable_data[-1][2].isspace(): last_syl_start, last_syl_end, last_syl_text, last_syl_width = syllable_data.pop() new_syl_text = last_syl_text + rem_text new_syl_end = remaining_chars[-1][1] new_syl_width = self.text_renderer._get_element_width(temp_draw, new_syl_text, default_font) syllable_data.append((last_syl_start, new_syl_end, new_syl_text, new_syl_width)) else: rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text, rem_width)) else: rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text, rem_width)) else: if word_lookup not in self.not_found_words_set and word_text.lower() == word_lookup: self.not_found_words_set.add(word_lookup) syl_start = word_chars[0][0] syl_end = word_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in word_chars) syllable_data.append((syl_start, syl_end, word_text, syl_width)) del temp_draw, temp_img syllable_data.sort(key=lambda x: x[0]) return syllable_data def group_syllables_into_lines(self, syllable_timing_data, video_width): """ Agrupa sílabas em "linhas" lógicas que agora correspondem a **frases completas**. Uma frase termina quando o último caractere não-espaço da sílaba corrente é ".","!" ou "?". """ lines = [] current_line_elements = [] SENT_END = ".!?" for start, end, text, element_width in syllable_timing_data: current_line_elements.append((start, end, text, element_width)) stripped = text.rstrip() if stripped and stripped[-1] in SENT_END: while current_line_elements and current_line_elements[-1][2].isspace(): current_line_elements.pop() if current_line_elements: lines.append(current_line_elements) current_line_elements = [] while current_line_elements and current_line_elements[-1][2].isspace(): current_line_elements.pop() if current_line_elements: lines.append(current_line_elements) return lines def process_subtitles_to_syllable_lines(self, file, video_width): char_timing_data, long_pauses = self.read_subtitles(file) if not char_timing_data: return [], [] words_and_spaces = self._group_chars_into_words(char_timing_data) syllable_timing_data = self._process_words_into_syllables(words_and_spaces) if not syllable_timing_data: print("Aviso: Nenhum dado de sílaba gerado.") return [], long_pauses lines = self.group_syllables_into_lines(syllable_timing_data, video_width) return lines, long_pauses class CUDAProcessor: def __init__(self, config, static_bg_rgb_cp): self.config = config self.static_bg_rgb_cp = static_bg_rgb_cp self.streams = [cp.cuda.Stream() for _ in range(2)] self.base_color_cp = hex_to_bgr_cupy(config["base_text_color"]) self.base_color_rgb_float_cp = self.base_color_cp[::-1].astype(cp.float32) / 255.0 highlight_hex = config["highlight_text_color"] highlight_bgr_cp = hex_to_bgr_cupy(highlight_hex) self.highlight_color_rgb_float_cp = highlight_bgr_cp[::-1].astype(cp.float32) / 255.0 self.progress_bar_fill_color_cp = highlight_bgr_cp self.progress_bar_fill_rgb_float_cp = self.highlight_color_rgb_float_cp hex_color_clean = highlight_hex.lstrip('#') rgb = tuple(int(hex_color_clean[i:i+2], 16) for i in (0, 2, 4)) darkening_factor = 0.4 dark_rgb = tuple(max(0, min(255, int(c * darkening_factor))) for c in rgb) self.progress_bar_bg_color_cp = cp.array(dark_rgb[::-1], dtype=cp.uint8) self.progress_bar_bg_rgb_float_cp = self.progress_bar_bg_color_cp[::-1].astype(cp.float32) / 255.0 self.frames_per_batch = config["frames_per_batch"] self.min_syl_duration = config.get("min_char_duration", 0.01) self.progress_bar_height = 20 self.progress_bar_y_start = 10 self.max_visual_fill_duration = config.get("max_visual_fill_duration", 3.0) if self.static_bg_rgb_cp is None: raise ValueError("CUDAProcessor requer um array static_bg_rgb_cp não nulo.") self.static_bg_rgb_float_cp = self.static_bg_rgb_cp.astype(cp.float32) / 255.0 def process_frames_streaming(self, base_cp, mask_cp, all_syllable_render_info, active_syllable_indices, video, video_lock, video_fps, current_frame, num_frames, width, height, pbar=None, prev_mask_cp=None, fade_start_time=None, fade_end_time=None, active_global_line_idx=-1, completed_global_line_indices=set(), long_pauses=None): """Processes frames including optional progress bar and consistent syllable fill.""" channels = 3 num_syls_total = len(all_syllable_render_info) syl_duration_for_normalization = cp.zeros(num_syls_total, dtype=cp.float32) next_syl_starts_cp_pre = cp.empty(num_syls_total, dtype=cp.float32) if num_syls_total > 0: syl_starts_cp = cp.asarray([info[0] for info in all_syllable_render_info], dtype=cp.float32) syl_ends_cp = cp.asarray([info[1] for info in all_syllable_render_info], dtype=cp.float32) syl_x_cp = cp.asarray([info[2] for info in all_syllable_render_info], dtype=cp.int32) syl_y_cp = cp.asarray([info[3] for info in all_syllable_render_info], dtype=cp.int32) syl_w_cp = cp.asarray([info[4] for info in all_syllable_render_info], dtype=cp.int32) syl_h_cp = cp.asarray([info[5] for info in all_syllable_render_info], dtype=cp.int32) syl_global_idx_cp = cp.asarray([info[6] for info in all_syllable_render_info], dtype=cp.int32) if num_syls_total > 1: next_syl_starts_cp_pre[:-1] = syl_starts_cp[1:] next_syl_starts_cp_pre[-1] = syl_ends_cp[-1] for syl_idx in range(num_syls_total): s_start = syl_starts_cp[syl_idx] raw_end_time = next_syl_starts_cp_pre[syl_idx] raw_duration = raw_end_time - s_start eff_duration = cp.maximum(raw_duration, self.min_syl_duration) syl_duration_for_normalization[syl_idx] = eff_duration if eff_duration > self.max_visual_fill_duration and long_pauses: syllable_overlaps = False for pause in long_pauses: pause_start, pause_end = pause["start"], pause["end"] if s_start < pause_end and raw_end_time > pause_start: syllable_overlaps = True break if syllable_overlaps: syl_duration_for_normalization[syl_idx] = self.max_visual_fill_duration del next_syl_starts_cp_pre else: syl_starts_cp = cp.empty(0, dtype=cp.float32) syl_ends_cp = cp.empty(0, dtype=cp.float32) syl_x_cp = cp.empty(0, dtype=cp.int32) syl_y_cp = cp.empty(0, dtype=cp.int32) syl_w_cp = cp.empty(0, dtype=cp.int32) syl_h_cp = cp.empty(0, dtype=cp.int32) syl_global_idx_cp = cp.empty(0, dtype=cp.int32) batch_size = min(self.frames_per_batch, 64) out_cp = [cp.empty((batch_size, height, width, channels), dtype=cp.uint8) for _ in range(2)] text_mask_bool = mask_cp > 128 text_mask_bool_exp = text_mask_bool[None, ..., None] y_coords, x_coords = cp.mgrid[:height, :width] bar_y_start = self.progress_bar_y_start bar_y_end = bar_y_start + self.progress_bar_height bar_bg_mask_full_area = (y_coords >= bar_y_start) & (y_coords < bar_y_end) result_queue = queue.Queue(maxsize=8) writer = threading.Thread(target=self._create_writer_thread(result_queue, video, video_lock, pbar), daemon=True) writer.start() max_visual_fill_duration = self.config.get("max_visual_fill_duration", 3.0) buffer_idx = 0 for batch_start in range(0, num_frames, batch_size): buffer_idx = (buffer_idx + 1) % 2 stream = self.streams[buffer_idx] batch_end = min(batch_start + batch_size, num_frames) batch_frames = batch_end - batch_start if batch_frames <= 0: continue out_batch_cp = out_cp[buffer_idx][:batch_frames] with stream: batch_frame_indices = cp.arange(batch_frames, dtype=cp.int32) absolute_frame_indices = current_frame + batch_start + batch_frame_indices frame_times_cp = absolute_frame_indices.astype(cp.float32) / video_fps intermediate_batch_float = cp.repeat(self.static_bg_rgb_float_cp[None, ...], batch_frames, axis=0) is_in_long_pause_batch_for_bar = cp.zeros(batch_frames, dtype=bool) if long_pauses: batch_needs_bar = cp.zeros(batch_frames, dtype=bool) batch_fill_width = cp.zeros(batch_frames, dtype=cp.int32) for pause in long_pauses: pause_start, pause_end, pause_duration = pause["start"], pause["end"], pause["duration"] is_in_this_pause_batch_indices = cp.where((frame_times_cp >= pause_start) & (frame_times_cp < pause_end))[0] if is_in_this_pause_batch_indices.size > 0: batch_needs_bar[is_in_this_pause_batch_indices] = True is_in_long_pause_batch_for_bar[is_in_this_pause_batch_indices] = True progress = (frame_times_cp[is_in_this_pause_batch_indices] - pause_start) / pause_duration fill_width = cp.clip((progress * width), 0, width).astype(cp.int32) batch_fill_width[is_in_this_pause_batch_indices] = fill_width if cp.any(batch_needs_bar): bar_bg_mask_batch = bar_bg_mask_full_area[None, :, :] & batch_needs_bar[:, None, None] x_coords_exp = x_coords[None, :, :] bar_fill_mask_batch = bar_bg_mask_batch & (x_coords_exp < batch_fill_width[:, None, None]) apply_bg_mask = bar_bg_mask_batch & (~bar_fill_mask_batch) intermediate_batch_float = cp.where( apply_bg_mask[..., None], self.progress_bar_bg_rgb_float_cp, intermediate_batch_float ) intermediate_batch_float = cp.where( bar_fill_mask_batch[..., None], self.progress_bar_fill_rgb_float_cp, intermediate_batch_float ) del bar_bg_mask_batch, bar_fill_mask_batch, apply_bg_mask, x_coords_exp del batch_needs_bar, batch_fill_width if prev_mask_cp is not None and fade_start_time is not None and fade_end_time is not None and fade_end_time > fade_start_time: prev_mask_float_exp = (prev_mask_cp > 128)[None, ..., None] time_in_fade = frame_times_cp - fade_start_time fade_duration = fade_end_time - fade_start_time fade_progress = cp.clip(time_in_fade / fade_duration, 0.0, 1.0) prev_alpha_batch = 1.0 - fade_progress prev_alpha_exp = prev_alpha_batch[:, None, None, None] intermediate_batch_float = cp.where( prev_mask_float_exp, self.base_color_rgb_float_cp * prev_alpha_exp + intermediate_batch_float * (1.0 - prev_alpha_exp), intermediate_batch_float ) batch_highlight_mask = cp.zeros((batch_frames, height, width), dtype=bool) completed_indices_cp = cp.asarray(list(completed_global_line_indices), dtype=cp.int32) if completed_indices_cp.size > 0 and num_syls_total > 0: is_completed_syl_mask = cp.isin(syl_global_idx_cp, completed_indices_cp) completed_syl_indices = cp.where(is_completed_syl_mask)[0] for syl_idx in completed_syl_indices: s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) combined_mask = syl_bbox_mask & text_mask_bool batch_highlight_mask[:, combined_mask] = True start_active_syl_idx, end_active_syl_idx = active_syllable_indices start_active_syl_idx = max(0, start_active_syl_idx) end_active_syl_idx = min(num_syls_total, end_active_syl_idx) if active_global_line_idx != -1 and start_active_syl_idx < end_active_syl_idx: for syl_idx in range(start_active_syl_idx, end_active_syl_idx): if syl_global_idx_cp[syl_idx] != active_global_line_idx: continue s_start = syl_starts_cp[syl_idx] s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue current_syl_norm_duration = syl_duration_for_normalization[syl_idx] elapsed_time = frame_times_cp - s_start safe_visual_duration = cp.maximum(current_syl_norm_duration, 1e-6) normalized_time = cp.clip(elapsed_time / safe_visual_duration, 0.0, 1.0) syl_progress_batch = normalized_time cutoff_x_batch = s_x + syl_progress_batch * s_w syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) highlight_area_mask_batch = x_coords[None, :, :] < cutoff_x_batch[:, None, None] active_highlight_apply_mask = syl_bbox_mask[None, :, :] & \ highlight_area_mask_batch & \ text_mask_bool[None, :, :] batch_highlight_mask |= active_highlight_apply_mask del elapsed_time, safe_visual_duration, normalized_time, cutoff_x_batch del syl_progress_batch, syl_bbox_mask del highlight_area_mask_batch, active_highlight_apply_mask final_color_batch_float = intermediate_batch_float highlight_mask_exp = batch_highlight_mask[..., None] final_color_batch_float = cp.where( text_mask_bool_exp & highlight_mask_exp, self.highlight_color_rgb_float_cp, final_color_batch_float ) final_color_batch_float = cp.where( text_mask_bool_exp & (~highlight_mask_exp), self.base_color_rgb_float_cp, final_color_batch_float ) out_batch_cp[:] = (final_color_batch_float * 255.0).astype(cp.uint8) del batch_highlight_mask, highlight_mask_exp del final_color_batch_float, intermediate_batch_float if 'completed_indices_cp' in locals(): del completed_indices_cp if 'is_completed_syl_mask' in locals(): del is_completed_syl_mask if 'completed_syl_indices' in locals(): del completed_syl_indices del is_in_long_pause_batch_for_bar stream.synchronize() result_queue.put((out_batch_cp.copy(), batch_frames)) cp.get_default_memory_pool().free_all_blocks() result_queue.put(None) writer.join() for s in self.streams: s.synchronize() if 'syl_starts_cp' in locals(): del syl_starts_cp if 'syl_ends_cp' in locals(): del syl_ends_cp if 'syl_x_cp' in locals(): del syl_x_cp if 'syl_y_cp' in locals(): del syl_y_cp if 'syl_w_cp' in locals(): del syl_w_cp if 'syl_h_cp' in locals(): del syl_h_cp if 'syl_global_idx_cp' in locals(): del syl_global_idx_cp if 'syl_duration_for_normalization' in locals(): del syl_duration_for_normalization del text_mask_bool, text_mask_bool_exp del y_coords, x_coords del out_cp, result_queue cp.get_default_memory_pool().free_all_blocks() del bar_bg_mask_full_area def _create_writer_thread(self, result_queue, video, video_lock, pbar): def writer_thread(): while True: batch_data = result_queue.get() if batch_data is None: result_queue.task_done() break frames_rgb_uint8, batch_frames_count = batch_data try: frames_bgr_uint8 = frames_rgb_uint8[:, :, :, ::-1] for frame_idx in range(batch_frames_count): with video_lock: video.write(cp.asnumpy(frames_bgr_uint8[frame_idx])) if pbar is not None: pbar.update(batch_frames_count) except Exception as e: print(f"Erro na thread escritora: {e}") if isinstance(e, (BrokenPipeError, OSError)): print("Erro crítico de escrita. Encerrando thread escritora.") break finally: del frames_rgb_uint8, frames_bgr_uint8 result_queue.task_done() return writer_thread class FFmpegWriter: def __init__(self, output_file, width, height, fps, config): self.output_file = output_file self.config = config ffmpeg_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "-", "-c:v", config["ffmpeg_codec"], "-preset", config["ffmpeg_preset"], "-b:v", config["ffmpeg_bitrate"], "-pix_fmt", "yuv420p", "-tune", config["ffmpeg_tune"], output_file ] self.ffmpeg_process = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=10**8 ) self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) self.stderr_thread.start() def _read_stderr(self): """Reads and prints FFmpeg's stderr output.""" try: for line in iter(self.ffmpeg_process.stderr.readline, b''): pass except Exception as e: print(f"Error reading FFmpeg stderr: {e}") def write(self, frame): """Writes a frame (NumPy/CuPy array) to FFmpeg.""" try: if isinstance(frame, cp.ndarray): frame = cp.asnumpy(frame) self.ffmpeg_process.stdin.write(frame.tobytes()) except (OSError, BrokenPipeError) as e: print(f"Error writing frame to FFmpeg: {e}. FFmpeg might have terminated.") self.release() raise def release(self): """Closes the FFmpeg process.""" if self.ffmpeg_process.stdin: try: self.ffmpeg_process.stdin.close() except OSError as e: print(f"Warning: Error closing FFmpeg stdin: {e}") ret_code = self.ffmpeg_process.wait() if ret_code != 0: print(f"Warning: FFmpeg process exited with non-zero status: {ret_code}") self.stderr_thread.join(timeout=1.0) def get_audio_duration(audio_file_path): if not os.path.exists(audio_file_path): print(f"Aviso: Arquivo de áudio não encontrado: {audio_file_path}") return None try: command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_file_path ] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) duration = float(result.stdout.strip()) return duration except FileNotFoundError: print("Erro: ffprobe não encontrado. Certifique-se de que o FFmpeg (que inclui ffprobe) está no PATH do sistema.") return None except subprocess.CalledProcessError as e: print(f"Erro ao executar ffprobe para obter duração do áudio: {e}") print(f"Stderr: {e.stderr}") return None except ValueError: print(f"Erro: Não foi possível converter a saída do ffprobe para float: {result.stdout.strip()}") return None except Exception as e: print(f"Erro inesperado ao obter duração do áudio: {e}") return None class KaraokeVideoCreator: def __init__(self, config, text_renderer: TextRenderer): self.config = config self.fps = config["video_fps"] self.text_renderer = text_renderer self.num_visible_lines = config["num_visible_lines"] width, height = 1920, 1080 try: width, height = map(int, self.config["video_resolution"].split("x")) except ValueError: print(f"Aviso: Resolução de vídeo inválida '{self.config['video_resolution']}'. Usando 1920x1080.") width, height = 1920, 1080 self.width = width self.height = height self.static_bg_frame_bgr_np = None self.static_bg_frame_rgb_cp = None bg_path = config.get("background_image") loaded_static = False if bg_path and os.path.exists(bg_path): try: bg_img = Image.open(bg_path).convert("RGB").resize((width, height), Image.Resampling.LANCZOS) self.static_bg_frame_bgr_np = np.array(bg_img)[:, :, ::-1].copy() self.static_bg_frame_rgb_cp = cp.asarray(np.array(bg_img)) loaded_static = True except Exception as e: print(f"Erro ao carregar imagem de fundo '{bg_path}': {e}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) else: if bg_path: print(f"Aviso: Imagem de fundo especificada não encontrada: {bg_path}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) self.init_gpu() self.cuda_processor = CUDAProcessor( config, static_bg_rgb_cp=self.static_bg_frame_rgb_cp ) def init_gpu(self): """Initializes the GPU device and memory pool.""" try: cp.cuda.Device(0).use() cp.cuda.set_allocator(cp.cuda.MemoryPool().malloc) cp.cuda.Stream.null.synchronize() _ = cp.zeros(1) except cp.cuda.runtime.CUDARuntimeError as e: print(f"Erro Crítico: Falha ao inicializar a GPU CUDA: {e}") raise except Exception as e: print(f"Erro Crítico: Falha inesperada durante inicialização da GPU: {e}") raise def _get_next_global_indices(self, current_displayed_indices, count=2): """Encontra os próximos 'count' índices globais que ainda não estão na tela.""" max_existing_idx = -1 valid_indices = {idx for idx in current_displayed_indices if idx is not None} if valid_indices: max_existing_idx = max(valid_indices) next_indices = [] candidate_idx = max_existing_idx + 1 while len(next_indices) < count: next_indices.append(candidate_idx) candidate_idx += 1 return next_indices def create_video(self, syllable_lines, long_pauses, output_file, audio_file_path): """Cria o vídeo de karaokê com atualização de conteúdo por slots.""" width, height = self.width, self.height N = self.num_visible_lines audio_duration = get_audio_duration(audio_file_path) first_syl_start_time = 0.0 last_syl_end_time = 0.0 if syllable_lines: first_valid_line_idx = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx != -1 and syllable_lines[first_valid_line_idx]: first_syl_start_time = syllable_lines[first_valid_line_idx][0][0] last_valid_line_idx = next((idx for idx in range(len(syllable_lines) - 1, -1, -1) if syllable_lines[idx]), -1) if last_valid_line_idx != -1 and syllable_lines[last_valid_line_idx]: last_syl_end_time = syllable_lines[last_valid_line_idx][-1][1] else: print("Aviso: Nenhuma linha de sílaba para processar.") last_syl_end_time = 1.0 video_end_time = last_syl_end_time + 0.5 if audio_duration is not None: video_end_time = max(video_end_time, audio_duration + 0.1) else: print(f"Aviso: Sem duração de áudio, vídeo terminará em {video_end_time:.2f}s baseado nas legendas.") total_frames = math.ceil(video_end_time * self.fps) print(f"Duração do vídeo estimada: {video_end_time:.2f}s, Total de frames: {total_frames}") try: video = FFmpegWriter(output_file, width, height, self.fps, self.config) except Exception as e: print(f"Erro Crítico: Falha ao inicializar FFmpegWriter: {e}") return video_lock = threading.Lock() start_time_processing = time.time() current_frame = 0 num_lines = len(syllable_lines) pbar = tqdm(total=total_frames, unit="frames", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} frames [{elapsed}<{remaining}, {rate_fmt}]", position=0, leave=True) displayed_content = [] for idx in range(N): line_data = syllable_lines[idx] if idx < num_lines else None displayed_content.append((idx if line_data else None, line_data)) completed_global_line_indices = set() prev_mask_cp = None trigger_1_pending_for_line = -1 trigger_2_pending = False last_trigger_1_line_completed = -1 last_trigger_2_line_completed = -1 initial_static_frames = 0 if syllable_lines: first_valid_line_idx_recalc = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx_recalc != -1: first_syl_start_time_recalc = syllable_lines[first_valid_line_idx_recalc][0][0] initial_static_frames = max(0, int(first_syl_start_time_recalc * self.fps)) if initial_static_frames > 0: try: if any(global_idx is not None for global_idx, _ in displayed_content): initial_base_cp, initial_mask_cp, all_syl_info, _ = self.text_renderer.render_text_images( displayed_content, -1, width, height ) if initial_base_cp is not None and initial_mask_cp is not None: self.cuda_processor.process_frames_streaming( initial_base_cp, initial_mask_cp, [], (-1, -1), video, video_lock, self.fps, 0, initial_static_frames, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) prev_mask_cp = initial_mask_cp.copy() del initial_base_cp, initial_mask_cp, all_syl_info else: print("Aviso: Renderização inicial produziu dados nulos. Usando fallback.") fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None else: fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None except Exception as e: print(f"Aviso: Falha ao pré-renderizar janela inicial estática: {e}") traceback.print_exc() fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None current_frame = initial_static_frames for i, current_line_syllables in enumerate(syllable_lines): if not current_line_syllables: print(f"Aviso: Pulando linha global vazia {i}") continue if trigger_2_pending and i != last_trigger_2_line_completed: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_bottom1 = next_indices[0] new_data_bottom1 = syllable_lines[new_idx_bottom1] if new_idx_bottom1 < num_lines else None displayed_content[N-2] = (new_idx_bottom1 if new_data_bottom1 else None, new_data_bottom1) new_idx_bottom2 = next_indices[1] new_data_bottom2 = syllable_lines[new_idx_bottom2] if new_idx_bottom2 < num_lines else None displayed_content[N-1] = (new_idx_bottom2 if new_data_bottom2 else None, new_data_bottom2) trigger_2_pending = False last_trigger_2_line_completed = i active_local_idx = -1 for local_idx, (global_idx, _) in enumerate(displayed_content): if global_idx == i: active_local_idx = local_idx break if active_local_idx == -1: line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) frames_to_fill_until_line = max(0, line_start_frame - current_frame) if frames_to_fill_until_line > 0: print(f"Aviso: Linha ativa {i} não encontrada na tela. Preenchendo {frames_to_fill_until_line} frames...") if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, frames_to_fill_until_line, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (frames_to_fill_until_line, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(frames_to_fill_until_line) current_frame += frames_to_fill_until_line continue render_data = None render_success = False try: render_data = self.text_renderer.render_text_images(displayed_content, active_local_idx, width, height) render_success = True except Exception as e: print(f"Erro crítico ao renderizar slots para linha {i}: {e}") traceback.print_exc() render_success = False if render_success and render_data: base_cp, mask_cp, all_syl_info, active_indices = render_data if base_cp is not None and mask_cp is not None and all_syl_info and active_indices[1] > active_indices[0]: line_start_time = current_line_syllables[0][0] line_end_time = current_line_syllables[-1][1] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_process = max(0, processing_end_frame - effective_start_frame_for_processing) trigger_1_frame = -1 is_penultimate_line = (active_local_idx == N - 2) if is_penultimate_line and i != last_trigger_1_line_completed: line_duration = max(line_end_time - line_start_time, 0.01) midpoint_time = line_start_time + line_duration / 2.0 trigger_1_frame = int(midpoint_time * self.fps) trigger_1_pending_for_line = i if num_frames_to_process > 0: fade_line_duration = 0.15 line_fade_start_time = line_start_time line_fade_end_time = line_start_time + fade_line_duration self.cuda_processor.process_frames_streaming( base_cp, mask_cp, all_syl_info, active_indices, video, video_lock, self.fps, effective_start_frame_for_processing, num_frames_to_process, width, height, pbar, prev_mask_cp, line_fade_start_time, line_fade_end_time, active_global_line_idx=i, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) processed_frames_end = effective_start_frame_for_processing + num_frames_to_process if trigger_1_pending_for_line == i and trigger_1_frame != -1 and processed_frames_end > trigger_1_frame: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_top1 = next_indices[0] new_data_top1 = syllable_lines[new_idx_top1] if new_idx_top1 < num_lines else None displayed_content[0] = (new_idx_top1 if new_data_top1 else None, new_data_top1) new_idx_top2 = next_indices[1] new_data_top2 = syllable_lines[new_idx_top2] if new_idx_top2 < num_lines else None displayed_content[1] = (new_idx_top2 if new_data_top2 else None, new_data_top2) trigger_1_pending_for_line = -1 last_trigger_1_line_completed = i current_frame = processed_frames_end if prev_mask_cp is not None: del prev_mask_cp prev_mask_cp = mask_cp.copy() if mask_cp is not None else None completed_global_line_indices.add(i) else: if i not in completed_global_line_indices: completed_global_line_indices.add(i) is_last_line = (active_local_idx == N - 1) if is_last_line: trigger_2_pending = True del base_cp, all_syl_info else: print(f"Aviso: Renderização para linha {i} inválida. Pulando GPU e preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill if render_data: try: del render_data[0], render_data[1], render_data[2], render_data[3] except: pass else: print(f"Aviso: Falha GERAL ao renderizar slots para linha {i}. Preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill cp.get_default_memory_pool().free_all_blocks() final_frames_to_fill = total_frames - current_frame if final_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) final_fade_start_time = (current_frame / self.fps) final_fade_duration = 0.5 final_fade_end_time = final_fade_start_time + final_fade_duration self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, final_frames_to_fill, width, height, pbar, prev_mask_cp, final_fade_start_time, final_fade_end_time, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (final_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(final_frames_to_fill) del fill_batch current_frame += final_frames_to_fill if pbar is not None: if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) pbar.close() video.release() if prev_mask_cp is not None: del prev_mask_cp del displayed_content[:] def load_syllables(filepath="syllables.txt"): """Carrega o dicionário de sílabas do arquivo.""" syllable_dict = {} not_found_words = set() try: with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or '|' not in line: continue parts = line.split('|', 1) if len(parts) == 2: word = parts[0].strip().lower() syllables = parts[1].strip() if word and syllables: syllable_dict[word] = syllables except FileNotFoundError: print(f"Aviso: Arquivo de sílabas '{filepath}' não encontrado. O destaque será por palavra.") except Exception as e: print(f"Erro ao carregar arquivo de sílabas '{filepath}': {e}") return syllable_dict, not_found_words def main(): start_main_time = time.time() config = DEFAULT_CONFIG.copy() try: device = cp.cuda.Device(0) device.use() except cp.cuda.runtime.CUDARuntimeError as e: if 'no CUDA-capable device is detected' in str(e): print("Erro Crítico: Nenhuma GPU compatível com CUDA foi detectada.") elif 'CUDA driver version is insufficient' in str(e): print("Erro Crítico: O driver NVIDIA CUDA está desatualizado. Atualize seus drivers.") else: print(f"Erro Crítico: Falha ao inicializar CUDA: {e}") print("O script não pode continuar sem CUDA. Verifique sua instalação NVIDIA e CUDA.") return except Exception as e: print(f"Erro inesperado durante a inicialização da GPU: {e}") return try: process = psutil.Process() cpu_count = os.cpu_count() affinity = list(range(cpu_count)) process.cpu_affinity(affinity) except (ImportError, AttributeError, OSError, ValueError) as e: print(f"Aviso: Não foi possível definir a afinidade da CPU (geralmente não é um problema): {e}") pass subtitle_file = config.get("default_subtitle_file", "legenda.psv") output_file = config.get("default_output_file", "video_karaoke_char_level.mp4") audio_file = "audio.wav" if not os.path.exists(subtitle_file): print(f"Erro Crítico: Arquivo de legenda '{subtitle_file}' não encontrado.") return if not os.path.exists(audio_file): print(f"Aviso: Arquivo de áudio '{audio_file}' não encontrado. A duração do vídeo será baseada apenas nas legendas.") try: text_renderer = TextRenderer(config) syllable_dict, not_found_words_set = load_syllables() subtitle_processor = SubtitleProcessor(text_renderer, config, syllable_dict, not_found_words_set) except Exception as e: print(f"Erro Crítico: Falha ao inicializar TextRenderer ou SubtitleProcessor: {e}") traceback.print_exc() return lines = [] long_pauses = [] video_width, _ = map(int, config["video_resolution"].split("x")) try: lines, long_pauses = subtitle_processor.process_subtitles_to_syllable_lines(subtitle_file, video_width) if not lines: if long_pauses: print("Aviso: Nenhuma linha visual foi criada, mas pausas longas foram detectadas.") else: print("Aviso: Nenhuma linha visual foi criada e nenhuma pausa longa detectada.") if long_pauses: print(f"Detectadas {len(long_pauses)} pausas longas (>= 5s).") except Exception as e: print(f"Erro Crítico: Falha ao processar legendas ou identificar pausas: {e}") traceback.print_exc() return if not lines and not long_pauses: print("Nenhuma linha visual ou pausa longa encontrada. Não é possível gerar o vídeo.") return try: karaoke_creator = KaraokeVideoCreator(config, text_renderer) karaoke_creator.create_video(lines, long_pauses, output_file, audio_file) except Exception as e: print(f"\nErro Crítico durante a criação do vídeo:") print(f"{e}") traceback.print_exc() print("\nO processo foi interrompido devido a um erro.") finally: cp.get_default_memory_pool().free_all_blocks() end_main_time = time.time() print(f"Finalizando Karaokê Creator. Tempo total do script: {time.strftime('%H:%M:%S', time.gmtime(end_main_time - start_main_time))}") if __name__ == "__main__": main()

Pensamento

Issue with karaoke video generator

The user is facing a bug where the karaoke syllable highlighting jumps when it reaches the last syllable of a word that ends with a dot (like "e." or "a."). The timing data used includes these periods, causing the gradient progress to behave abruptly at the end of a word. They want a step-by-step patch to fix this issue. I'll need to carefully analyze the code and adjust the handling of syllables ending with dots to make the transition smooth.

Analyzing the gradient bug

The code processes syllables but handles punctuation at the end of words inconsistently. In the drawing step, punctuation is sometimes removed, but the gradient uses an earlier width calculation that includes the punctuation. This mismatch causes an abrupt jump in the highlight on syllables with punctuation, like "e." or "a.". The key issue seems to be that the actual drawn width doesn't include the dot, affecting the gradual filling of the gradient. A solution involves adjusting the highlight mask to account for the punctuation correctly.

Fixing gradient behavior

The issue arises because the code removes the trailing dot in “e.” but calculates the highlight area including the dot. This mismatch causes the highlight to stop prematurely when it reaches the last syllable, as the bounding box includes the dot, but the mask doesn't. When the next phrase starts, the highlight "jumps" forward due to the miscalculation of the width. The solution could be to avoid removing the dot or handle it as a separate entity to maintain consistent highlight behavior.

Refining highlight issue

The core issue appears to be that the trailing period (dot) is removed when drawing text, but the highlight calculation still includes it. This mismatch causes the highlight to freeze early and act unnaturally. The solution could be to recompute the bounding box using the actual drawn text, including the period, or avoid stripping the dot entirely. Alternatively, adjusting the bounding box and syllable width to match the drawn width could improve the highlight's appearance and prevent the abrupt jumps at the end of syllables.

Thought for 1m 19s

Resposta

Segue o patch que elimina o “pulo” no gradiente quando a última sílaba termina em ponto, garantindo que o cálculo de largura, o bbox e o texto realmente desenhado sejam sempre coerentes.

diff
diff --git a/seu_script.py b/seu_script.py @@ def _get_element_width(self, draw, text, font): """Obtém a largura de um elemento de texto (sílaba ou espaço).""" - if text == " ": - return self.space_width_ref + # Ajuste: se a sílaba termina com ponto e tem mais de 1 caractere, + # desconsideramos o ponto para cálculo de largura (ele não será desenhado). + if text.endswith('.') and len(text.strip()) > 1: + text = text.rstrip('.') + if text == " ": + return self.space_width_ref @@ for i, (start_time, end_time, element_text, _) in enumerate(elements_in_line): - element_width = self._get_element_width(draw_base, element_text, font_line) - - if not element_text.isspace(): - try: - draw_x = int(current_x) - draw_y = line_y_draw - element_text_to_draw = element_text - if element_text_to_draw.endswith('.') and len(element_text_to_draw.strip()) > 1: - element_text_to_draw = element_text_to_draw.rstrip('.') - draw_base.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=self.base_text_color) - draw_mask.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=255) - - final_bbox = draw_base.textbbox((draw_x, draw_y), element_text, font=font_line) + # --- INÍCIO DO PATCH ------------------------------- + # Texto que será efetivamente desenhado + element_text_to_draw = element_text + if element_text_to_draw.endswith('.') and len(element_text_to_draw.strip()) > 1: + element_text_to_draw = element_text_to_draw.rstrip('.') + + # Medimos a largura do texto REAL que será exibido + element_width = self._get_element_width(draw_base, element_text_to_draw, font_line) + + if not element_text_to_draw.isspace(): + try: + draw_x = int(current_x) + draw_y = line_y_draw + draw_base.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=self.base_text_color) + draw_mask.text((draw_x, draw_y), element_text_to_draw, font=font_line, fill=255) + + # BBox também sobre o texto real (sem o ponto) + final_bbox = draw_base.textbbox((draw_x, draw_y), element_text_to_draw, font=font_line) ... - all_syllable_render_info.append( - (start_time, end_time, bbox_left, bbox_top_final, syl_w_actual, syl_h_actual, current_global_line_idx) - ) + all_syllable_render_info.append( + (start_time, end_time, bbox_left, bbox_top_final, syl_w_actual, syl_h_actual, current_global_line_idx) + ) ... - current_x += element_width + # Avança o cursor horizontal respeitando a largura real + current_x += element_width + # --- FIM DO PATCH ----------------------------------

O que mudou

  1. _get_element_width

    • Ignora o ponto final ao medir a largura quando ele está colado à última sílaba, mantendo o espaçamento e o cálculo de centralização corretos.
  2. render_text_images

    • Usa sempre o texto efetivamente desenhado (element_text_to_draw) para:
      • medir a largura (element_width);
      • desenhar o texto e a máscara;
      • calcular o bbox (final_bbox);
      • avançar o cursor horizontal (current_x).

Com essas alterações, o syllable_width passado para o CUDAProcessor é exatamente o mesmo da máscara de texto, fazendo o gradiente preencher a sílaba inteira (sem congelar nos ~80 %) e avançar suavemente até o fim da frase.

Compartilhar esta Q&A