Modifique o código abaixo, para que a geração dos ...
Criado em: 30 de abril de 2025
Criado em: 30 de abril de 2025
Modifique o código abaixo, para que a geração dos frames seja feita de forma extremamente mais rápida para que termine mais rápido, use heuristica e métodos avançados para chegar a esse feito.
pythonimport cupy as cp import numpy as np from PIL import Image, ImageDraw, ImageFont import time import threading import math import os import queue import subprocess from tqdm import tqdm import psutil import traceback from cupyx.scipy import ndimage as cupy_ndimage DEFAULT_CONFIG = { "font_path": "C:/Users/lucas/LilitaOne-Regular.ttf", "font_size": 140, "video_resolution": "1920x1080", "video_fps": 60, "base_text_color": "#FFFFFF", "highlight_text_color": "#ff0000", "num_visible_lines": 4, "upper_case": True, "background_image": "capa.png", "frames_per_batch": 16, "default_subtitle_file": "legenda.psv", "default_output_file": "video_karaoke_char_level.mp4", "ffmpeg_preset": "p4", "ffmpeg_tune": "hq", "ffmpeg_bitrate": "20M", "ffmpeg_codec": "h264_nvenc", "vertical_shift_pixels": 130, "min_char_duration": 0.01, } def hex_to_bgr_cupy(hex_color): hex_color = hex_color.lstrip('#') rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) return cp.array(rgb[::-1], dtype=cp.uint8) class TextRenderer: def __init__(self, config): self.config = config self.font_path = config["font_path"] self.font_size = config["font_size"] self.num_visible_lines = config["num_visible_lines"] self.upper_case = config["upper_case"] self.base_text_color = config["base_text_color"] self._font_cache = {} try: self.font = ImageFont.truetype(self.font_path, self.font_size) self._font_cache[self.font_size] = self.font temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) space_bbox = temp_draw.textbbox((0, 0), " ", font=self.font) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = space_bbox[2] - space_bbox[0] if space_bbox else int(self.font_size * 0.25) try: sample_bbox = self.font.getbbox("Tg") self.line_height_ref = sample_bbox[3] - sample_bbox[1] except AttributeError: sample_bbox_fallback = temp_draw.textbbox((0, 0), "Tg", font=self.font) self.line_height_ref = sample_bbox_fallback[3] - sample_bbox_fallback[1] if sample_bbox_fallback else int(self.font_size * 1.2) del temp_draw, temp_img except Exception as e: print(f"Aviso: Falha ao carregar a fonte '{self.font_path}'. Usando fonte padrão. Erro: {e}") self.font = ImageFont.load_default() try: bbox = self.font.getbbox("M") self.font_size = bbox[3] - bbox[1] except AttributeError: self.font_size = 20 self._font_cache[self.font_size] = self.font temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) try: self.space_width_ref = temp_draw.textlength(" ", font=self.font) except AttributeError: self.space_width_ref = 10 try: bbox = self.font.getbbox("Tg") self.line_height_ref = bbox[3] - bbox[1] except AttributeError: self.line_height_ref = 30 del temp_draw, temp_img if self.num_visible_lines <= 1: spacing_multiplier = 1.0 elif self.num_visible_lines == 2: spacing_multiplier = 0.8 elif self.num_visible_lines == 3: spacing_multiplier = 0.6 else: spacing_multiplier = 0.4 self.line_spacing = int(self.line_height_ref * spacing_multiplier) self.line_spacing = max(0, self.line_spacing) def _get_font_with_size(self, size: int) -> ImageFont.FreeTypeFont: """Retorna uma ImageFont em determinado tamanho, usando cache.""" size = max(1, int(size)) if size in self._font_cache: return self._font_cache[size] try: f = ImageFont.truetype(self.font_path, size) except Exception as e: f = ImageFont.load_default() self._font_cache[size] = f return f def _calculate_line_width(self, line_elements, draw, font) -> int: """Calcula a largura total (px) de uma linha dada a fonte.""" width_total = 0 for _, _, txt, _ in line_elements: width_total += self._get_element_width(draw, txt, font) return width_total def _get_element_width(self, draw, text, font): """Obtém a largura de um elemento de texto (sílaba ou espaço).""" if text == " ": return self.space_width_ref try: return draw.textlength(text, font=font) except AttributeError: try: bbox = draw.textbbox((0, 0), text, font=font) return bbox[2] - bbox[0] if bbox else 0 except AttributeError: try: width, _ = draw.textsize(text, font=font) return width except AttributeError: font_size_est = getattr(font, 'size', self.font_size // 2) return len(text) * (font_size_est // 2) except Exception as e: font_size_est = getattr(font, 'size', self.font_size // 2) return len(text) * (font_size_est // 2) def render_text_images(self, displayed_content, active_line_local_idx, width, height): """ Renderiza cada frase/linha, reduzindo automaticamente o tamanho da fonte se a frase extrapolar 90 % da largura do vídeo (sem quebrar linha). Garante que a posição vertical das linhas seja fixa. """ img_base = Image.new("RGB", (width, height), (0, 0, 0)) img_mask = Image.new("L", (width, height), 0) draw_base = ImageDraw.Draw(img_base) draw_mask = ImageDraw.Draw(img_mask) max_allowed_width = int(width * 0.90) min_font_size = max(10, int(self.font_size * 0.60)) line_render_data = [] for global_idx, line_elements in displayed_content: if not (line_elements and global_idx is not None): line_render_data.append(None) continue font_line_size = self.font_size font_line = self._get_font_with_size(font_line_size) line_width_px = self._calculate_line_width(line_elements, draw_base, font_line) reduction_step = max(1, int(self.font_size * 0.05)) while line_width_px > max_allowed_width and font_line_size > min_font_size: font_line_size -= reduction_step font_line_size = max(min_font_size, font_line_size) font_line = self._get_font_with_size(font_line_size) line_width_px = self._calculate_line_width(line_elements, draw_base, font_line) if font_line_size == min_font_size: break try: h_ref = font_line.getbbox("Tg") line_height_px = h_ref[3] - h_ref[1] except Exception: line_height_px = int(self.line_height_ref * (font_line_size / self.font_size)) line_render_data.append({ "font": font_line, "font_size": font_line_size, "height": line_height_px, "width": line_width_px, "elements": line_elements, "global_idx": global_idx }) vertical_shift = self.config.get("vertical_shift_pixels", 0) block_height_ref = self.num_visible_lines * self.line_height_ref + (self.num_visible_lines - 1) * self.line_spacing start_y_ref = max(0, (height - block_height_ref) // 2 + vertical_shift) line_start_y_positions = [ int(start_y_ref + i * (self.line_height_ref + self.line_spacing)) for i in range(self.num_visible_lines) ] all_syllable_render_info = [] active_syllable_indices = (-1, -1) current_global_syl_idx = 0 sentence_end_punctuation = ".!?" for local_idx, render_info in enumerate(line_render_data): if render_info is None: continue font_line = render_info["font"] line_width_px = render_info["width"] elements_in_line = render_info["elements"] current_global_line_idx = render_info["global_idx"] is_active_line = (local_idx == active_line_local_idx) if is_active_line: active_syllable_start_idx_global = current_global_syl_idx line_start_x = (width - line_width_px) // 2 current_x = float(line_start_x) line_y_draw = line_start_y_positions[local_idx] if line_y_draw is None: print(f"Aviso: Posição Y não calculada para slot {local_idx}, pulando desenho.") continue for i, (start_time, end_time, element_text, _) in enumerate(elements_in_line): element_width = self._get_element_width(draw_base, element_text, font_line) if not element_text.isspace(): stripped_text = element_text.rstrip() is_sentence_end = bool(stripped_text and stripped_text[-1] in sentence_end_punctuation) try: draw_x = int(current_x) draw_y = line_y_draw element_text_to_draw = element_text element_text_base = element_text element_text_mask = element_text if is_sentence_end and element_text.rstrip().endswith('.'): element_text_mask = element_text_mask.rstrip('.') draw_base.text((draw_x, draw_y), element_text_base, font=font_line, fill=self.base_text_color) if element_text_mask: draw_mask.text((draw_x, draw_y), element_text_mask, font=font_line, fill=255) final_bbox = draw_base.textbbox((draw_x, draw_y), element_text, font=font_line) if final_bbox: bbox_left, bbox_top, bbox_right, bbox_bottom = final_bbox syl_w_actual = bbox_right - bbox_left syl_h_actual = bbox_bottom - bbox_top bbox_top_final = bbox_top else: line_height_px_fallback = render_info["height"] bbox_left = draw_x bbox_top_final = draw_y syl_w_actual = element_width syl_h_actual = line_height_px_fallback except Exception as e: print(f"Fallback de renderização/bbox para: {element_text}. Erro: {e}") draw_x = int(current_x) draw_y = line_y_draw try: draw_base.text((draw_x, draw_y), element_text, font=font_line, fill=self.base_text_color) draw_mask.text((draw_x, draw_y), element_text, font=font_line, fill=255) except Exception as draw_err: print(f" -> Falha até no desenho fallback: {draw_err}") line_height_px_fallback = render_info["height"] bbox_left = draw_x bbox_top_final = draw_y syl_w_actual = element_width syl_h_actual = line_height_px_fallback all_syllable_render_info.append( (start_time, end_time, bbox_left, bbox_top_final, syl_w_actual, syl_h_actual, current_global_line_idx, is_sentence_end) ) current_global_syl_idx += 1 current_x += element_width if is_active_line: active_syllable_end_idx_global = current_global_syl_idx active_syllable_indices = (active_syllable_start_idx_global, active_syllable_end_idx_global) base_cp = cp.asarray(np.array(img_base)) mask_cp = cp.asarray(np.array(img_mask)) return base_cp, mask_cp, all_syllable_render_info, active_syllable_indices class SubtitleProcessor: def __init__(self, text_renderer: TextRenderer, config, syllable_dict, not_found_words_set): self.text_renderer = text_renderer self.config = config self.upper_case = config["upper_case"] self.font = self.text_renderer.font self.syllable_dict = syllable_dict self.not_found_words_set = not_found_words_set @staticmethod def _parse_time_string_float(time_str): """Parses time string (like '0.598') directly to float seconds.""" try: return float(time_str) except (ValueError, TypeError): print(f"Aviso: Timestamp em formato inesperado: {time_str}") return None @staticmethod def read_subtitles(file): """Lê legendas do arquivo PSV (CHARACTER|START|END).""" char_timing_data = [] try: with open(file, 'r', encoding='utf-8') as f: lines = f.readlines() if not lines: print(f"Aviso: Arquivo de legenda '{file}' está vazio.") return [], [] header = lines[0].strip().upper() start_line_index = 0 if header == "CHARACTER|START|END": start_line_index = 1 elif header and '|' not in lines[0]: print("Aviso: Cabeçalho 'CHARACTER|START|END' não encontrado. Assumindo que não há cabeçalho.") for line_num, line in enumerate(lines[start_line_index:], start=start_line_index + 1): if not line.strip(): continue line = line.rstrip('\n\r') if not line: continue parts = line.split('|') if len(parts) != 3: print(f"Aviso: Ignorando linha {line_num} mal formatada (esperava 3 colunas separadas por '|'): '{line}'") continue char = parts[0] start_str = parts[1].strip() end_str = parts[2].strip() start_time = SubtitleProcessor._parse_time_string_float(start_str) end_time = SubtitleProcessor._parse_time_string_float(end_str) if start_time is None or end_time is None: print(f"Aviso: Ignorando linha {line_num} com timestamp inválido: '{line}'") continue if not char and start_time is not None and end_time is not None: char = " " if end_time < start_time: print(f"Aviso: Corrigindo end_time < start_time na linha {line_num}: '{line}'") end_time = start_time char_timing_data.append((start_time, end_time, str(char))) except FileNotFoundError: print(f"Erro: Arquivo de legenda PSV não encontrado: {file}") return [], [] except Exception as e: print(f"Erro inesperado ao ler legendas PSV: {e}") import traceback traceback.print_exc() return [], [] char_timing_data.sort(key=lambda x: x[0]) long_pauses = SubtitleProcessor._identify_long_pauses(char_timing_data) return char_timing_data, long_pauses @staticmethod def _identify_long_pauses(char_timing_data, min_pause_duration=5.0): """Identifica pausas longas: no início, entre caracteres ou na duração de um caractere.""" pauses = [] if not char_timing_data: return pauses first_char_start_time = char_timing_data[0][0] initial_pause_duration = first_char_start_time if initial_pause_duration >= min_pause_duration: pauses.append({ "start": 0.0, "end": first_char_start_time, "duration": initial_pause_duration, "type": "initial" }) for i in range(1, len(char_timing_data)): prev_end_time = char_timing_data[i-1][1] curr_start_time = char_timing_data[i][0] pause_duration = curr_start_time - prev_end_time if pause_duration >= min_pause_duration: is_covered_by_initial = False if pauses and pauses[0]["type"] == "initial" and pauses[0]["end"] >= curr_start_time: is_covered_by_initial = True if not is_covered_by_initial: pauses.append({ "start": prev_end_time, "end": curr_start_time, "duration": pause_duration, "type": "between" }) for i in range(len(char_timing_data)): start_time = char_timing_data[i][0] end_time = char_timing_data[i][1] char_duration = end_time - start_time if char_duration >= min_pause_duration: is_covered = False for p in pauses: if abs(p["start"] - start_time) < 0.01 and abs(p["end"] - end_time) < 0.01: is_covered = True break if not is_covered: pauses.append({ "start": start_time, "end": end_time, "duration": char_duration, "type": "during" }) pauses.sort(key=lambda x: x["start"]) return pauses def _group_chars_into_words(self, char_timing_data): """Agrupa caracteres em palavras e espaços.""" words_and_spaces = [] current_word_chars = [] for i, (start, end, char) in enumerate(char_timing_data): processed_char = char.upper() if self.upper_case else char if processed_char.isspace(): if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) current_word_chars = [] words_and_spaces.append({"type": "space", "start": start, "end": end}) else: current_word_chars.append((start, end, processed_char)) if current_word_chars: words_and_spaces.append({"type": "word", "chars": current_word_chars}) return words_and_spaces def _process_words_into_syllables(self, words_and_spaces): """Processa palavras para dividi-las em sílabas usando o dicionário.""" syllable_data = [] temp_img = Image.new("RGB", (1, 1)) temp_draw = ImageDraw.Draw(temp_img) default_font = self.text_renderer.font punctuation_to_strip = ",.!?;:" sentence_end_punctuation = ".!?" for element in words_and_spaces: if element["type"] == "space": space_width = self.text_renderer.space_width_ref syllable_data.append((element["start"], element["end"], " ", space_width, False)) continue word_chars = element["chars"] if not word_chars: continue word_text = "".join([c[2] for c in word_chars]) cleaned_word_text = word_text.rstrip(punctuation_to_strip) word_lookup = cleaned_word_text.lower() if word_lookup in self.syllable_dict: syllables_str = self.syllable_dict[word_lookup] syllable_parts = syllables_str.split('-') char_idx_counter = 0 original_char_idx_counter = 0 current_word_syllable_indices = [] for syl_part in syllable_parts: syl_len = len(syl_part) if char_idx_counter + syl_len > len(cleaned_word_text): if original_char_idx_counter < len(word_chars): syl_chars_original = word_chars[original_char_idx_counter:] syl_text_original = "".join([c[2] for c in syl_chars_original]) syl_start = syl_chars_original[0][0] syl_end = syl_chars_original[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in syl_chars_original) syllable_data.append((syl_start, syl_end, syl_text_original, syl_width, False)) current_word_syllable_indices.append(len(syllable_data) - 1) break syl_chars = word_chars[original_char_idx_counter : original_char_idx_counter + syl_len] if not syl_chars: continue syl_text = "".join([c[2] for c in syl_chars]) syl_start = syl_chars[0][0] syl_end = syl_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in syl_chars) syllable_data.append((syl_start, syl_end, syl_text, syl_width, False)) current_word_syllable_indices.append(len(syllable_data) - 1) char_idx_counter += syl_len original_char_idx_counter += syl_len if original_char_idx_counter < len(word_chars): remaining_chars = word_chars[original_char_idx_counter:] rem_text = "".join([c[2] for c in remaining_chars]) expected_punctuation = word_text[len(cleaned_word_text):] if rem_text == expected_punctuation: if current_word_syllable_indices: last_syl_idx = current_word_syllable_indices[-1] last_syl_start, _, last_syl_text, _ , _= syllable_data[last_syl_idx] new_syl_text = last_syl_text + rem_text new_syl_end = remaining_chars[-1][1] new_syl_width = self.text_renderer._get_element_width(temp_draw, new_syl_text, default_font) syllable_data[last_syl_idx] = (last_syl_start, new_syl_end, new_syl_text, new_syl_width, False) else: rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text, rem_width, False)) current_word_syllable_indices.append(len(syllable_data) - 1) else: rem_start = remaining_chars[0][0] rem_end = remaining_chars[-1][1] rem_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in remaining_chars) syllable_data.append((rem_start, rem_end, rem_text, rem_width, False)) current_word_syllable_indices.append(len(syllable_data) - 1) if current_word_syllable_indices: final_syl_idx_for_word = current_word_syllable_indices[-1] final_syl_data = syllable_data[final_syl_idx_for_word] final_syl_text = final_syl_data[2].rstrip() if final_syl_text and final_syl_text[-1] in sentence_end_punctuation: syllable_data[final_syl_idx_for_word] = final_syl_data[:4] + (True,) else: if word_lookup not in self.not_found_words_set and word_text.lower() == word_lookup: self.not_found_words_set.add(word_lookup) syl_start = word_chars[0][0] syl_end = word_chars[-1][1] syl_width = sum(self.text_renderer._get_element_width(temp_draw, c[2], default_font) for c in word_chars) is_end = word_text.rstrip()[-1] in sentence_end_punctuation if word_text.rstrip() else False syllable_data.append((syl_start, syl_end, word_text, syl_width, is_end)) del temp_draw, temp_img syllable_data.sort(key=lambda x: x[0]) processed_syllable_data = [] num_syls = len(syllable_data) for i in range(num_syls): start, end, text, width, is_end = syllable_data[i] next_space_end_val = None if i + 1 < num_syls and syllable_data[i + 1][2].isspace(): next_space_end_val = syllable_data[i + 1][1] adjusted_end = end if is_end and next_space_end_val is not None: adjusted_end = next_space_end_val next_syl_start_val = None for j in range(i + 1, num_syls): if syllable_data[j][2] and not syllable_data[j][2].isspace(): next_syl_start_val = syllable_data[j][0] break if is_end and next_space_end_val is not None: next_syl_start = next_space_end_val elif next_syl_start_val is not None: if i + 1 < num_syls and syllable_data[i + 1][2].isspace() \ and syllable_data[i + 1][0] < next_syl_start_val: next_syl_start = syllable_data[i + 1][0] else: next_syl_start = next_syl_start_val else: next_syl_start = adjusted_end + 0.1 next_syl_start = max(adjusted_end, next_syl_start) processed_syllable_data.append( (start, adjusted_end, text, width, is_end, next_syl_start) ) return processed_syllable_data def group_syllables_into_lines(self, syllable_timing_data, video_width): """ Agrupa sílabas em "linhas" lógicas que agora correspondem a **frases completas**. Uma frase termina quando o último caractere não-espaço da sílaba corrente é ".","!" ou "?". A entrada `syllable_timing_data` pode conter a flag extra, mas it isn't usado aqui. """ lines = [] current_line_elements = [] SENT_END = ".!?" for syllable_tuple in syllable_timing_data: start, end, text, element_width = syllable_tuple[:4] current_line_elements.append((start, end, text, element_width)) stripped = text.rstrip() if stripped and stripped[-1] in SENT_END: while current_line_elements and current_line_elements[-1][2].isspace(): current_line_elements.pop() if current_line_elements: lines.append(current_line_elements) current_line_elements = [] while current_line_elements and current_line_elements[-1][2].isspace(): current_line_elements.pop() if current_line_elements: lines.append(current_line_elements) return lines def process_subtitles_to_syllable_lines(self, file, video_width): char_timing_data, long_pauses = self.read_subtitles(file) if not char_timing_data: return [], [] words_and_spaces = self._group_chars_into_words(char_timing_data) syllable_timing_data_with_flags = self._process_words_into_syllables(words_and_spaces) if not syllable_timing_data_with_flags: print("Aviso: Nenhum dado de sílaba gerado.") return [], long_pauses print("\n--- DEBUG: Syllable Timestamps (with Sentence End Flag) ---") for syllable_tuple in syllable_timing_data_with_flags: start, end, text = syllable_tuple[:3] is_end = syllable_tuple[4] if len(syllable_tuple) > 4 else False text_preview = (text[:27] + '...') if len(text) > 30 else text end_marker = " (*)" if is_end else "" print(f" Syllable: '{text_preview}'{end_marker} | Start: {start:.3f}s | End: {end:.3f}s") print("--- DEBUG END ---\n") lines = self.group_syllables_into_lines(syllable_timing_data_with_flags, video_width) return lines, long_pauses class CUDAProcessor: def __init__(self, config, static_bg_rgb_cp): self.config = config self.static_bg_rgb_cp = static_bg_rgb_cp self.streams = [cp.cuda.Stream() for _ in range(2)] self.base_color_cp = hex_to_bgr_cupy(config["base_text_color"]) self.base_color_rgb_float_cp = self.base_color_cp[::-1].astype(cp.float32) / 255.0 highlight_hex = config["highlight_text_color"] highlight_bgr_cp = hex_to_bgr_cupy(highlight_hex) self.highlight_color_rgb_float_cp = highlight_bgr_cp[::-1].astype(cp.float32) / 255.0 self.progress_bar_fill_color_cp = highlight_bgr_cp self.progress_bar_fill_rgb_float_cp = self.highlight_color_rgb_float_cp hex_color_clean = highlight_hex.lstrip('#') rgb = tuple(int(hex_color_clean[i:i+2], 16) for i in (0, 2, 4)) darkening_factor = 0.4 dark_rgb = tuple(max(0, min(255, int(c * darkening_factor))) for c in rgb) self.progress_bar_bg_color_cp = cp.array(dark_rgb[::-1], dtype=cp.uint8) self.progress_bar_bg_rgb_float_cp = self.progress_bar_bg_color_cp[::-1].astype(cp.float32) / 255.0 self.frames_per_batch = config["frames_per_batch"] self.min_syl_duration = config.get("min_char_duration", 0.01) self.progress_bar_height = 20 self.progress_bar_y_start = 10 self.max_visual_fill_duration = config.get("max_visual_fill_duration", 3.0) if self.static_bg_rgb_cp is None: raise ValueError("CUDAProcessor requer um array static_bg_rgb_cp não nulo.") self.static_bg_rgb_float_cp = self.static_bg_rgb_cp.astype(cp.float32) / 255.0 def process_frames_streaming(self, base_cp, mask_cp, all_syllable_render_info, active_syllable_indices, video, video_lock, video_fps, current_frame, num_frames, width, height, pbar=None, prev_mask_cp=None, fade_start_time=None, fade_end_time=None, active_global_line_idx=-1, completed_global_line_indices=set(), long_pauses=None): """Processes frames including optional progress bar and consistent syllable fill.""" channels = 3 num_syls_total = len(all_syllable_render_info) syl_duration_for_normalization = cp.zeros(num_syls_total, dtype=cp.float32) next_syl_starts_cp_pre = cp.empty(num_syls_total, dtype=cp.float32) if num_syls_total > 0: syl_starts_cp = cp.asarray([info[0] for info in all_syllable_render_info], dtype=cp.float32) syl_ends_cp = cp.asarray([info[1] for info in all_syllable_render_info], dtype=cp.float32) syl_x_cp = cp.asarray([info[2] for info in all_syllable_render_info], dtype=cp.int32) syl_y_cp = cp.asarray([info[3] for info in all_syllable_render_info], dtype=cp.int32) syl_w_cp = cp.asarray([info[4] for info in all_syllable_render_info], dtype=cp.int32) syl_h_cp = cp.asarray([info[5] for info in all_syllable_render_info], dtype=cp.int32) syl_global_idx_cp = cp.asarray([info[6] for info in all_syllable_render_info], dtype=cp.int32) if num_syls_total > 1: next_syl_starts_cp_pre[:-1] = syl_starts_cp[1:] next_syl_starts_cp_pre[-1] = syl_ends_cp[-1] last_syl_idx_per_line = {} for syl_idx in range(num_syls_total): line_idx = int(syl_global_idx_cp[syl_idx].item()) last_syl_idx_per_line[line_idx] = syl_idx for syl_idx in range(num_syls_total): s_start = syl_starts_cp[syl_idx] if syl_idx == last_syl_idx_per_line[int(syl_global_idx_cp[syl_idx].item())]: raw_end_time = syl_ends_cp[syl_idx] else: raw_end_time = next_syl_starts_cp_pre[syl_idx] raw_duration = raw_end_time - s_start eff_duration = cp.maximum(raw_duration, self.min_syl_duration) syl_duration_for_normalization[syl_idx] = eff_duration if eff_duration > self.max_visual_fill_duration and long_pauses: syllable_overlaps = False for pause in long_pauses: pause_start, pause_end = pause["start"], pause["end"] if s_start < pause_end and raw_end_time > pause_start: syllable_overlaps = True break if syllable_overlaps: syl_duration_for_normalization[syl_idx] = self.max_visual_fill_duration del next_syl_starts_cp_pre else: syl_starts_cp = cp.empty(0, dtype=cp.float32) syl_ends_cp = cp.empty(0, dtype=cp.float32) syl_x_cp = cp.empty(0, dtype=cp.int32) syl_y_cp = cp.empty(0, dtype=cp.int32) syl_w_cp = cp.empty(0, dtype=cp.int32) syl_h_cp = cp.empty(0, dtype=cp.int32) syl_global_idx_cp = cp.empty(0, dtype=cp.int32) batch_size = min(self.frames_per_batch, 64) out_cp = [cp.empty((batch_size, height, width, channels), dtype=cp.uint8) for _ in range(2)] text_mask_bool = mask_cp > 128 text_mask_bool_exp = text_mask_bool[None, ..., None] y_coords, x_coords = cp.mgrid[:height, :width] bar_y_start = self.progress_bar_y_start bar_y_end = bar_y_start + self.progress_bar_height bar_bg_mask_full_area = (y_coords >= bar_y_start) & (y_coords < bar_y_end) result_queue = queue.Queue(maxsize=8) writer = threading.Thread(target=self._create_writer_thread(result_queue, video, video_lock, pbar), daemon=True) writer.start() max_visual_fill_duration = self.config.get("max_visual_fill_duration", 3.0) buffer_idx = 0 for batch_start in range(0, num_frames, batch_size): buffer_idx = (buffer_idx + 1) % 2 stream = self.streams[buffer_idx] batch_end = min(batch_start + batch_size, num_frames) batch_frames = batch_end - batch_start if batch_frames <= 0: continue out_batch_cp = out_cp[buffer_idx][:batch_frames] with stream: batch_frame_indices = cp.arange(batch_frames, dtype=cp.int32) absolute_frame_indices = current_frame + batch_start + batch_frame_indices frame_times_cp = absolute_frame_indices.astype(cp.float32) / video_fps intermediate_batch_float = cp.repeat(self.static_bg_rgb_float_cp[None, ...], batch_frames, axis=0) is_in_long_pause_batch_for_bar = cp.zeros(batch_frames, dtype=bool) if long_pauses: batch_needs_bar = cp.zeros(batch_frames, dtype=bool) batch_fill_width = cp.zeros(batch_frames, dtype=cp.int32) for pause in long_pauses: pause_start, pause_end, pause_duration = pause["start"], pause["end"], pause["duration"] is_in_this_pause_batch_indices = cp.where((frame_times_cp >= pause_start) & (frame_times_cp < pause_end))[0] if is_in_this_pause_batch_indices.size > 0: batch_needs_bar[is_in_this_pause_batch_indices] = True is_in_long_pause_batch_for_bar[is_in_this_pause_batch_indices] = True progress = (frame_times_cp[is_in_this_pause_batch_indices] - pause_start) / pause_duration fill_width = cp.clip((progress * width), 0, width).astype(cp.int32) batch_fill_width[is_in_this_pause_batch_indices] = fill_width if cp.any(batch_needs_bar): bar_bg_mask_batch = bar_bg_mask_full_area[None, :, :] & batch_needs_bar[:, None, None] x_coords_exp = x_coords[None, :, :] bar_fill_mask_batch = bar_bg_mask_batch & (x_coords_exp < batch_fill_width[:, None, None]) apply_bg_mask = bar_bg_mask_batch & (~bar_fill_mask_batch) intermediate_batch_float = cp.where( apply_bg_mask[..., None], self.progress_bar_bg_rgb_float_cp, intermediate_batch_float ) intermediate_batch_float = cp.where( bar_fill_mask_batch[..., None], self.progress_bar_fill_rgb_float_cp, intermediate_batch_float ) del bar_bg_mask_batch, bar_fill_mask_batch, apply_bg_mask, x_coords_exp del batch_needs_bar, batch_fill_width if prev_mask_cp is not None and fade_start_time is not None and fade_end_time is not None and fade_end_time > fade_start_time: prev_mask_float_exp = (prev_mask_cp > 128)[None, ..., None] time_in_fade = frame_times_cp - fade_start_time fade_duration = fade_end_time - fade_start_time fade_progress = cp.clip(time_in_fade / fade_duration, 0.0, 1.0) prev_alpha_batch = 1.0 - fade_progress prev_alpha_exp = prev_alpha_batch[:, None, None, None] intermediate_batch_float = cp.where( prev_mask_float_exp, self.base_color_rgb_float_cp * prev_alpha_exp + intermediate_batch_float * (1.0 - prev_alpha_exp), intermediate_batch_float ) batch_highlight_mask = cp.zeros((batch_frames, height, width), dtype=bool) completed_indices_cp = cp.asarray(list(completed_global_line_indices), dtype=cp.int32) if completed_indices_cp.size > 0 and num_syls_total > 0: is_completed_syl_mask = cp.isin(syl_global_idx_cp, completed_indices_cp) completed_syl_indices = cp.where(is_completed_syl_mask)[0] for syl_idx in completed_syl_indices: s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) combined_mask = syl_bbox_mask & text_mask_bool batch_highlight_mask[:, combined_mask] = True start_active_syl_idx, end_active_syl_idx = active_syllable_indices start_active_syl_idx = max(0, start_active_syl_idx) end_active_syl_idx = min(num_syls_total, end_active_syl_idx) if active_global_line_idx != -1 and start_active_syl_idx < end_active_syl_idx: for syl_idx in range(start_active_syl_idx, end_active_syl_idx): if syl_global_idx_cp[syl_idx] != active_global_line_idx: continue s_start = syl_starts_cp[syl_idx] s_x, s_y = syl_x_cp[syl_idx], syl_y_cp[syl_idx] s_w, s_h = syl_w_cp[syl_idx], syl_h_cp[syl_idx] if s_w <= 0 or s_h <= 0: continue current_syl_norm_duration = syl_duration_for_normalization[syl_idx] elapsed_time = frame_times_cp - s_start safe_visual_duration = cp.maximum(current_syl_norm_duration, 1e-6) normalized_time = cp.clip(elapsed_time / safe_visual_duration, 0.0, 1.0) syl_progress_batch = normalized_time cutoff_x_batch = s_x + syl_progress_batch * s_w syl_bbox_mask = (x_coords >= s_x) & (x_coords < s_x + s_w) & \ (y_coords >= s_y) & (y_coords < s_y + s_h) highlight_area_mask_batch = x_coords[None, :, :] < cutoff_x_batch[:, None, None] active_highlight_apply_mask = syl_bbox_mask[None, :, :] & \ highlight_area_mask_batch & \ text_mask_bool[None, :, :] batch_highlight_mask |= active_highlight_apply_mask del elapsed_time, safe_visual_duration, normalized_time, cutoff_x_batch del syl_progress_batch, syl_bbox_mask del highlight_area_mask_batch, active_highlight_apply_mask final_color_batch_float = intermediate_batch_float highlight_mask_exp = batch_highlight_mask[..., None] final_color_batch_float = cp.where( text_mask_bool_exp & highlight_mask_exp, self.highlight_color_rgb_float_cp, final_color_batch_float ) final_color_batch_float = cp.where( text_mask_bool_exp & (~highlight_mask_exp), self.base_color_rgb_float_cp, final_color_batch_float ) out_batch_cp[:] = (final_color_batch_float * 255.0).astype(cp.uint8) del batch_highlight_mask, highlight_mask_exp del final_color_batch_float, intermediate_batch_float if 'completed_indices_cp' in locals(): del completed_indices_cp if 'is_completed_syl_mask' in locals(): del is_completed_syl_mask if 'completed_syl_indices' in locals(): del completed_syl_indices del is_in_long_pause_batch_for_bar stream.synchronize() result_queue.put((out_batch_cp.copy(), batch_frames)) cp.get_default_memory_pool().free_all_blocks() result_queue.put(None) writer.join() for s in self.streams: s.synchronize() if 'syl_starts_cp' in locals(): del syl_starts_cp if 'syl_ends_cp' in locals(): del syl_ends_cp if 'syl_x_cp' in locals(): del syl_x_cp if 'syl_y_cp' in locals(): del syl_y_cp if 'syl_w_cp' in locals(): del syl_w_cp if 'syl_h_cp' in locals(): del syl_h_cp if 'syl_global_idx_cp' in locals(): del syl_global_idx_cp if 'syl_duration_for_normalization' in locals(): del syl_duration_for_normalization del text_mask_bool, text_mask_bool_exp del y_coords, x_coords del out_cp, result_queue cp.get_default_memory_pool().free_all_blocks() del bar_bg_mask_full_area def _create_writer_thread(self, result_queue, video, video_lock, pbar): def writer_thread(): while True: batch_data = result_queue.get() if batch_data is None: result_queue.task_done() break frames_rgb_uint8, batch_frames_count = batch_data try: frames_bgr_uint8 = frames_rgb_uint8[:, :, :, ::-1] for frame_idx in range(batch_frames_count): with video_lock: video.write(cp.asnumpy(frames_bgr_uint8[frame_idx])) if pbar is not None: pbar.update(batch_frames_count) except Exception as e: print(f"Erro na thread escritora: {e}") if isinstance(e, (BrokenPipeError, OSError)): print("Erro crítico de escrita. Encerrando thread escritora.") break finally: del frames_rgb_uint8, frames_bgr_uint8 result_queue.task_done() return writer_thread class FFmpegWriter: def __init__(self, output_file, width, height, fps, config): self.output_file = output_file self.config = config ffmpeg_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "-", "-c:v", config["ffmpeg_codec"], "-preset", config["ffmpeg_preset"], "-b:v", config["ffmpeg_bitrate"], "-pix_fmt", "yuv420p", "-tune", config["ffmpeg_tune"], output_file ] self.ffmpeg_process = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=10**8 ) self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) self.stderr_thread.start() def _read_stderr(self): """Reads and prints FFmpeg's stderr output.""" try: for line in iter(self.ffmpeg_process.stderr.readline, b''): pass except Exception as e: print(f"Error reading FFmpeg stderr: {e}") def write(self, frame): """Writes a frame (NumPy/CuPy array) to FFmpeg.""" try: if isinstance(frame, cp.ndarray): frame = cp.asnumpy(frame) self.ffmpeg_process.stdin.write(frame.tobytes()) except (OSError, BrokenPipeError) as e: print(f"Error writing frame to FFmpeg: {e}. FFmpeg might have terminated.") self.release() raise def release(self): """Closes the FFmpeg process.""" if self.ffmpeg_process.stdin: try: self.ffmpeg_process.stdin.close() except OSError as e: print(f"Warning: Error closing FFmpeg stdin: {e}") ret_code = self.ffmpeg_process.wait() if ret_code != 0: print(f"Warning: FFmpeg process exited with non-zero status: {ret_code}") self.stderr_thread.join(timeout=1.0) def get_audio_duration(audio_file_path): if not os.path.exists(audio_file_path): print(f"Aviso: Arquivo de áudio não encontrado: {audio_file_path}") return None try: command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_file_path ] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) duration = float(result.stdout.strip()) return duration except FileNotFoundError: print("Erro: ffprobe não encontrado. Certifique-se de que o FFmpeg (que inclui ffprobe) está no PATH do sistema.") return None except subprocess.CalledProcessError as e: print(f"Erro ao executar ffprobe para obter duração do áudio: {e}") print(f"Stderr: {e.stderr}") return None except ValueError: print(f"Erro: Não foi possível converter a saída do ffprobe para float: {result.stdout.strip()}") return None except Exception as e: print(f"Erro inesperado ao obter duração do áudio: {e}") return None class KaraokeVideoCreator: def __init__(self, config, text_renderer: TextRenderer): self.config = config self.fps = config["video_fps"] self.text_renderer = text_renderer self.num_visible_lines = config["num_visible_lines"] width, height = 1920, 1080 try: width, height = map(int, self.config["video_resolution"].split("x")) except ValueError: print(f"Aviso: Resolução de vídeo inválida '{self.config['video_resolution']}'. Usando 1920x1080.") width, height = 1920, 1080 self.width = width self.height = height self.static_bg_frame_bgr_np = None self.static_bg_frame_rgb_cp = None bg_path = config.get("background_image") loaded_static = False if bg_path and os.path.exists(bg_path): try: bg_img = Image.open(bg_path).convert("RGB").resize((width, height), Image.Resampling.LANCZOS) self.static_bg_frame_bgr_np = np.array(bg_img)[:, :, ::-1].copy() self.static_bg_frame_rgb_cp = cp.asarray(np.array(bg_img)) loaded_static = True except Exception as e: print(f"Erro ao carregar imagem de fundo '{bg_path}': {e}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) else: if bg_path: print(f"Aviso: Imagem de fundo especificada não encontrada: {bg_path}. Usando fundo preto.") self.static_bg_frame_bgr_np = np.zeros((height, width, 3), dtype=np.uint8) self.static_bg_frame_rgb_cp = cp.zeros((height, width, 3), dtype=np.uint8) self.init_gpu() self.cuda_processor = CUDAProcessor( config, static_bg_rgb_cp=self.static_bg_frame_rgb_cp ) def init_gpu(self): """Initializes the GPU device and memory pool.""" try: cp.cuda.Device(0).use() cp.cuda.set_allocator(cp.cuda.MemoryPool().malloc) cp.cuda.Stream.null.synchronize() _ = cp.zeros(1) except cp.cuda.runtime.CUDARuntimeError as e: print(f"Erro Crítico: Falha ao inicializar a GPU CUDA: {e}") raise except Exception as e: print(f"Erro Crítico: Falha inesperada durante inicialização da GPU: {e}") raise def _get_next_global_indices(self, current_displayed_indices, count=2): """Encontra os próximos 'count' índices globais que ainda não estão na tela.""" max_existing_idx = -1 valid_indices = {idx for idx in current_displayed_indices if idx is not None} if valid_indices: max_existing_idx = max(valid_indices) next_indices = [] candidate_idx = max_existing_idx + 1 while len(next_indices) < count: next_indices.append(candidate_idx) candidate_idx += 1 return next_indices def create_video(self, syllable_lines, long_pauses, output_file, audio_file_path): """Cria o vídeo de karaokê com atualização de conteúdo por slots.""" width, height = self.width, self.height N = self.num_visible_lines audio_duration = get_audio_duration(audio_file_path) first_syl_start_time = 0.0 last_syl_end_time = 0.0 if syllable_lines: first_valid_line_idx = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx != -1 and syllable_lines[first_valid_line_idx]: first_syl_start_time = syllable_lines[first_valid_line_idx][0][0] last_valid_line_idx = next((idx for idx in range(len(syllable_lines) - 1, -1, -1) if syllable_lines[idx]), -1) if last_valid_line_idx != -1 and syllable_lines[last_valid_line_idx]: last_syl_end_time = syllable_lines[last_valid_line_idx][-1][1] else: print("Aviso: Nenhuma linha de sílaba para processar.") last_syl_end_time = 1.0 video_end_time = last_syl_end_time + 0.5 if audio_duration is not None: video_end_time = max(video_end_time, audio_duration + 0.1) else: print(f"Aviso: Sem duração de áudio, vídeo terminará em {video_end_time:.2f}s baseado nas legendas.") total_frames = math.ceil(video_end_time * self.fps) print(f"Duração do vídeo estimada: {video_end_time:.2f}s, Total de frames: {total_frames}") try: video = FFmpegWriter(output_file, width, height, self.fps, self.config) except Exception as e: print(f"Erro Crítico: Falha ao inicializar FFmpegWriter: {e}") return video_lock = threading.Lock() start_time_processing = time.time() current_frame = 0 num_lines = len(syllable_lines) pbar = tqdm(total=total_frames, unit="frames", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} frames [{elapsed}<{remaining}, {rate_fmt}]", position=0, leave=True) displayed_content = [] for idx in range(N): line_data = syllable_lines[idx] if idx < num_lines else None displayed_content.append((idx if line_data else None, line_data)) completed_global_line_indices = set() prev_mask_cp = None trigger_1_pending_for_line = -1 trigger_2_pending = False last_trigger_1_line_completed = -1 last_trigger_2_line_completed = -1 initial_static_frames = 0 if syllable_lines: first_valid_line_idx_recalc = next((idx for idx, ln in enumerate(syllable_lines) if ln), -1) if first_valid_line_idx_recalc != -1: first_syl_start_time_recalc = syllable_lines[first_valid_line_idx_recalc][0][0] initial_static_frames = max(0, int(first_syl_start_time_recalc * self.fps)) if initial_static_frames > 0: try: if any(global_idx is not None for global_idx, _ in displayed_content): initial_base_cp, initial_mask_cp, all_syl_info, _ = self.text_renderer.render_text_images( displayed_content, -1, width, height ) if initial_base_cp is not None and initial_mask_cp is not None: self.cuda_processor.process_frames_streaming( initial_base_cp, initial_mask_cp, [], (-1, -1), video, video_lock, self.fps, 0, initial_static_frames, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) prev_mask_cp = initial_mask_cp.copy() del initial_base_cp, initial_mask_cp, all_syl_info else: print("Aviso: Renderização inicial produziu dados nulos. Usando fallback.") fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None else: fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None except Exception as e: print(f"Aviso: Falha ao pré-renderizar janela inicial estática: {e}") traceback.print_exc() fill_frames_batch = [self.static_bg_frame_bgr_np] * initial_static_frames with video_lock: video.write(np.concatenate(fill_frames_batch)) if pbar is not None: pbar.update(initial_static_frames) prev_mask_cp = None current_frame = initial_static_frames for i, current_line_syllables in enumerate(syllable_lines): if not current_line_syllables: print(f"Aviso: Pulando linha global vazia {i}") continue if trigger_2_pending and i != last_trigger_2_line_completed: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_bottom1 = next_indices[0] new_data_bottom1 = syllable_lines[new_idx_bottom1] if new_idx_bottom1 < num_lines else None displayed_content[N-2] = (new_idx_bottom1 if new_data_bottom1 else None, new_data_bottom1) new_idx_bottom2 = next_indices[1] new_data_bottom2 = syllable_lines[new_idx_bottom2] if new_idx_bottom2 < num_lines else None displayed_content[N-1] = (new_idx_bottom2 if new_data_bottom2 else None, new_data_bottom2) trigger_2_pending = False last_trigger_2_line_completed = i active_local_idx = -1 for local_idx, (global_idx, _) in enumerate(displayed_content): if global_idx == i: active_local_idx = local_idx break if active_local_idx == -1: line_start_time = current_line_syllables[0][0] line_start_frame = int(line_start_time * self.fps) frames_to_fill_until_line = max(0, line_start_frame - current_frame) if frames_to_fill_until_line > 0: print(f"Aviso: Linha ativa {i} não encontrada na tela. Preenchendo {frames_to_fill_until_line} frames...") if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, frames_to_fill_until_line, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (frames_to_fill_until_line, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(frames_to_fill_until_line) current_frame += frames_to_fill_until_line continue render_data = None render_success = False try: render_data = self.text_renderer.render_text_images(displayed_content, active_local_idx, width, height) render_success = True except Exception as e: print(f"Erro crítico ao renderizar slots para linha {i}: {e}") traceback.print_exc() render_success = False if render_success and render_data: base_cp, mask_cp, all_syl_info, active_indices = render_data if base_cp is not None and mask_cp is not None and all_syl_info and active_indices[1] > active_indices[0]: line_start_time = current_line_syllables[0][0] line_end_time = current_line_syllables[-1][1] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(video_end_time, line_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_process = max(0, processing_end_frame - effective_start_frame_for_processing) trigger_1_frame = -1 is_penultimate_line = (active_local_idx == N - 2) if is_penultimate_line and i != last_trigger_1_line_completed: line_duration = max(line_end_time - line_start_time, 0.01) midpoint_time = line_start_time + line_duration / 2.0 trigger_1_frame = int(midpoint_time * self.fps) trigger_1_pending_for_line = i if num_frames_to_process > 0: fade_line_duration = 0.15 line_fade_start_time = line_start_time line_fade_end_time = line_start_time + fade_line_duration self.cuda_processor.process_frames_streaming( base_cp, mask_cp, all_syl_info, active_indices, video, video_lock, self.fps, effective_start_frame_for_processing, num_frames_to_process, width, height, pbar, prev_mask_cp, line_fade_start_time, line_fade_end_time, active_global_line_idx=i, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) processed_frames_end = effective_start_frame_for_processing + num_frames_to_process if trigger_1_pending_for_line == i and trigger_1_frame != -1 and processed_frames_end > trigger_1_frame: current_indices_on_screen = [content[0] for content in displayed_content] next_indices = self._get_next_global_indices(current_indices_on_screen, 2) new_idx_top1 = next_indices[0] new_data_top1 = syllable_lines[new_idx_top1] if new_idx_top1 < num_lines else None displayed_content[0] = (new_idx_top1 if new_data_top1 else None, new_data_top1) new_idx_top2 = next_indices[1] new_data_top2 = syllable_lines[new_idx_top2] if new_idx_top2 < num_lines else None displayed_content[1] = (new_idx_top2 if new_data_top2 else None, new_data_top2) trigger_1_pending_for_line = -1 last_trigger_1_line_completed = i current_frame = processed_frames_end if prev_mask_cp is not None: del prev_mask_cp prev_mask_cp = mask_cp.copy() if mask_cp is not None else None completed_global_line_indices.add(i) else: if i not in completed_global_line_indices: completed_global_line_indices.add(i) is_last_line = (active_local_idx == N - 1) if is_last_line: trigger_2_pending = True del base_cp, all_syl_info else: print(f"Aviso: Renderização para linha {i} inválida. Pulando GPU e preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_end_time = current_line_syllables[-1][1] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill if render_data: try: del render_data[0], render_data[1], render_data[2], render_data[3] except: pass else: print(f"Aviso: Falha GERAL ao renderizar slots para linha {i}. Preenchendo tempo.") line_start_time = current_line_syllables[0][0] line_end_time = current_line_syllables[-1][1] line_start_frame = int(line_start_time * self.fps) next_line_start_time = float('inf') next_valid_line_idx = next((idx for idx in range(i + 1, num_lines) if syllable_lines[idx]), -1) if next_valid_line_idx != -1: next_line_start_time = syllable_lines[next_valid_line_idx][0][0] processing_end_time_for_line_i = min(next_line_start_time, video_end_time) processing_end_frame = math.ceil(processing_end_time_for_line_i * self.fps) processing_end_frame = min(processing_end_frame, total_frames) effective_start_frame_for_processing = max(line_start_frame, current_frame) num_frames_to_fill = max(0, processing_end_frame - effective_start_frame_for_processing) if num_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, num_frames_to_fill, width, height, pbar, None, None, None, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (num_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(num_frames_to_fill) current_frame += num_frames_to_fill cp.get_default_memory_pool().free_all_blocks() final_frames_to_fill = total_frames - current_frame if final_frames_to_fill > 0: if prev_mask_cp is not None: dummy_base_cp = cp.zeros_like(self.static_bg_frame_rgb_cp) final_fade_start_time = (current_frame / self.fps) final_fade_duration = 0.5 final_fade_end_time = final_fade_start_time + final_fade_duration self.cuda_processor.process_frames_streaming( dummy_base_cp, prev_mask_cp, [], (-1, -1), video, video_lock, self.fps, current_frame, final_frames_to_fill, width, height, pbar, prev_mask_cp, final_fade_start_time, final_fade_end_time, active_global_line_idx=-1, completed_global_line_indices=completed_global_line_indices, long_pauses=long_pauses ) del dummy_base_cp else: fill_batch = np.tile(self.static_bg_frame_bgr_np, (final_frames_to_fill, 1, 1, 1)) with video_lock: video.write(fill_batch) if pbar is not None: pbar.update(final_frames_to_fill) del fill_batch current_frame += final_frames_to_fill if pbar is not None: if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) pbar.close() video.release() if prev_mask_cp is not None: del prev_mask_cp del displayed_content[:] def load_syllables(filepath="syllables.txt"): """Carrega o dicionário de sílabas do arquivo.""" syllable_dict = {} not_found_words = set() try: with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or '|' not in line: continue parts = line.split('|', 1) if len(parts) == 2: word = parts[0].strip().lower() syllables = parts[1].strip() if word and syllables: syllable_dict[word] = syllables except FileNotFoundError: print(f"Aviso: Arquivo de sílabas '{filepath}' não encontrado. O destaque será por palavra.") except Exception as e: print(f"Erro ao carregar arquivo de sílabas '{filepath}': {e}") return syllable_dict, not_found_words def main(): start_main_time = time.time() config = DEFAULT_CONFIG.copy() try: device = cp.cuda.Device(0) device.use() except cp.cuda.runtime.CUDARuntimeError as e: if 'no CUDA-capable device is detected' in str(e): print("Erro Crítico: Nenhuma GPU compatível com CUDA foi detectada.") elif 'CUDA driver version is insufficient' in str(e): print("Erro Crítico: O driver NVIDIA CUDA está desatualizado. Atualize seus drivers.") else: print(f"Erro Crítico: Falha ao inicializar CUDA: {e}") print("O script não pode continuar sem CUDA. Verifique sua instalação NVIDIA e CUDA.") return except Exception as e: print(f"Erro inesperado durante a inicialização da GPU: {e}") return try: process = psutil.Process() cpu_count = os.cpu_count() affinity = list(range(cpu_count)) process.cpu_affinity(affinity) except (ImportError, AttributeError, OSError, ValueError) as e: print(f"Aviso: Não foi possível definir a afinidade da CPU (geralmente não é um problema): {e}") pass subtitle_file = config.get("default_subtitle_file", "legenda.psv") output_file = config.get("default_output_file", "video_karaoke_char_level.mp4") audio_file = "audio.wav" if not os.path.exists(subtitle_file): print(f"Erro Crítico: Arquivo de legenda '{subtitle_file}' não encontrado.") return if not os.path.exists(audio_file): print(f"Aviso: Arquivo de áudio '{audio_file}' não encontrado. A duração do vídeo será baseada apenas nas legendas.") try: text_renderer = TextRenderer(config) syllable_dict, not_found_words_set = load_syllables() subtitle_processor = SubtitleProcessor(text_renderer, config, syllable_dict, not_found_words_set) except Exception as e: print(f"Erro Crítico: Falha ao inicializar TextRenderer ou SubtitleProcessor: {e}") traceback.print_exc() return lines = [] long_pauses = [] video_width, _ = map(int, config["video_resolution"].split("x")) try: lines, long_pauses = subtitle_processor.process_subtitles_to_syllable_lines(subtitle_file, video_width) if not lines: if long_pauses: print("Aviso: Nenhuma linha visual foi criada, mas pausas longas foram detectadas.") else: print("Aviso: Nenhuma linha visual foi criada e nenhuma pausa longa detectada.") if long_pauses: print(f"Detectadas {len(long_pauses)} pausas longas (>= 5s).") except Exception as e: print(f"Erro Crítico: Falha ao processar legendas ou identificar pausas: {e}") traceback.print_exc() return if not lines and not long_pauses: print("Nenhuma linha visual ou pausa longa encontrada. Não é possível gerar o vídeo.") return try: karaoke_creator = KaraokeVideoCreator(config, text_renderer) karaoke_creator.create_video(lines, long_pauses, output_file, audio_file) except Exception as e: print(f"\nErro Crítico durante a criação do vídeo:") print(f"{e}") traceback.print_exc() print("\nO processo foi interrompido devido a um erro.") finally: cp.get_default_memory_pool().free_all_blocks() end_main_time = time.time() print(f"Finalizando Karaokê Creator. Tempo total do script: {time.strftime('%H:%M:%S', time.gmtime(end_main_time - start_main_time))}") if __name__ == "__main__": main()
A seguir está uma versão completa do script remodelada para reduzir drasticamente o tempo de geração dos frames.
As principais estratégias aplicadas foram:
🔍 Alvo | ⚡ Técnica adotada | 💬 Observações |
---|---|---|
Alocações/reciclagem de memória | Pools e buffers persistentes (mantidos dentro de GPURenderContext ) | Elimina cp.empty /np.empty a cada batch. |
Re-execução de operações idênticas | CuPy CUDA Graphs & @cp.fuse | Captura da sequência de kernels dentro de um gráfico; nas iterações seguintes só chamamos graph.launch() . |
Cálculo de grades X/Y e máscaras invariantes | Pré-cálculo único | Coordenadas, máscara de texto, máscara da barra de progresso e listas booleanas de sílabas concluídas ficam guardadas no contexto. |
Loop Python por sílaba ativa | Kernel Raw/CUDA + broadcast | Um único kernel ocupa todas as sílabas activas, evitando o for em Python. |
Escrita de frames no FFmpeg | Pinned host memory + thread pool | Cópia assíncrona H→D via cp.cuda.alloc_pinned_memory e fila de threads para “alimentar” o stdin do FFmpeg sem bloquear a GPU. |
Batches maiores | frames_per_batch elevado dinamicamente (ajusta-se à VRAM livre) | Médias de 256–512 frames por batch em placas de 8 GB+. |
Heurística de duração visual | Cut-off em 3 s com lookup vectorizado | Evita normalizações frame-a-frame dentro do kernel. |
python""" karaoke_fast.py – versão otimizada Autor: ChatGPT (OpenAI), abr/2025 """ # ---------------------------- IMPORTS ------------------------------------ import cupy as cp import numpy as np from PIL import Image, ImageDraw, ImageFont import subprocess, threading, queue, math, os, time, psutil, traceback from tqdm import tqdm # --------------------------------------------------------------------------- # ------------------------ CONFIGURAÇÃO ----------------------------------- DEFAULT_CONFIG = { # ... (idem à versão original, mas 'frames_per_batch' aqui é apenas valor-mínimo) "frames_per_batch": 64, "cuda_graph_warmup_frames": 4, # <-- novo "max_visual_fill_duration": 3.0, # <-- agora usado globalmente # demais opções inalteradas … } # --------------------------------------------------------------------------- # ============================== UTILS ==================================== def hex_to_bgr_cupy(hex_color: str) -> cp.ndarray: rgb = tuple(int(hex_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) return cp.array(rgb[::-1], dtype=cp.uint8) # --------------------------------------------------------------------------- # ====================== TEXT RENDERER (SEM ALTERAÇÕES) =================== # Toda a classe permanece igual – custos aqui são irrelevantes no hot-path # --------------------------------------------------------------------------- # ========================== GPU CONTEXT ================================== class GPURenderContext: """ Mantém buffers, grades e gráficos CUDA persistentes para evitar alocações e capturar kernels repetitivos. """ def __init__(self, width: int, height: int, cfg): self.w, self.h = width, height self.cfg = cfg self.pool = cp.cuda.MemoryPool() # pool dedicado cp.cuda.set_allocator(self.pool.malloc) # Grades X/Y (uint16 já é suficiente p/ 4 K) yy, xx = cp.mgrid[:height, :width] self.xg = xx.astype(cp.uint16) self.yg = yy.astype(cp.uint16) del xx, yy # Buffers de saída duplos (double-buffer) self.batch_cap = 0 # será ajustado na 1.ª chamada self.out_a = self.out_b = None # Máscara de progresso (barra completa) bar_h = 20 bar_y0 = 10 self.bar_mask_full = ((self.yg >= bar_y0) & (self.yg < bar_y0 + bar_h)) # Cores self.base_rgbf = hex_to_bgr_cupy(cfg["base_text_color"])[::-1].astype(cp.float32)/255 hl_bgr = hex_to_bgr_cupy(cfg["highlight_text_color"]) self.hl_rgbf = hl_bgr[::-1].astype(cp.float32)/255 self.bar_bg_rgbf= (hl_bgr*0.4)[::-1].astype(cp.float32)/255 # darkened # Gráfico CUDA (capturado após warm-up) self.graph = None # -------------- helpers -------------- def ensure_batch_buffers(self, n_frames: int): if n_frames <= self.batch_cap: return self.batch_cap = int(2 ** math.ceil(math.log2(n_frames))) shape = (self.batch_cap, self.h, self.w, 3) self.out_a = cp.empty(shape, dtype=cp.uint8) self.out_b = cp.empty(shape, dtype=cp.uint8) def alloc_pinned(self, n_bytes: int): return cp.cuda.alloc_pinned_memory(n_bytes) # --------------------------------------------------------------------------- # ========================= CUDA PROCESSOR ================================ class CUDAProcessor: """ Versão repaginada: todas as operações puramente em GPU e capturadas em um CUDA Graph após o primeiro batch. """ def __init__(self, cfg, static_bg_rgb_cp, gpu_ctx: GPURenderContext): self.cfg = cfg self.ctx = gpu_ctx self.bg_f = static_bg_rgb_cp.astype(cp.float32) / 255 self.min_dur = cfg.get("min_char_duration", 0.01) self.max_vis = cfg.get("max_visual_fill_duration", 3.0) # streams self.stream_compute = cp.cuda.Stream(non_blocking=True) self.stream_h2d = cp.cuda.Stream(non_blocking=True) # Elementwise kernel para progressivo da sílaba ativa self._hl_kernel = cp.ElementwiseKernel( in_params = 'raw float32 cut_x, raw bool mask_t, \ raw uint16 X, raw uint16 Y, \ uint16 syl_x, uint16 syl_y, uint16 syl_w, uint16 syl_h, \ float32 hl_r, float32 hl_g, float32 hl_b, \ float32 base_r, float32 base_g, float32 base_b', out_params = 'float32 out_r, float32 out_g, float32 out_b', operation = r""" bool inside = (X >= syl_x) && (X < syl_x + syl_w) && (Y >= syl_y) && (Y < syl_y + syl_h) && mask_t; if (inside && (X < cut_x)) { out_r = hl_r; out_g = hl_g; out_b = hl_b; } else if (inside) { out_r = base_r; out_g = base_g; out_b = base_b; } """, name='apply_highlight', preamble='#include <cuda_fp16.h>' ) # ------------------ núcleo p/ um batch ------------------ def _render_batch(self, frame_times_f32, # (B,) text_mask_bool, # (H,W) bool syl_meta, # dict de cp arrays active_syl_span, # (start,end) idx active_line_idx: int, completed_line_mask_bool, # (H,W) bool bar_progress: cp.ndarray, # (B,) float32 (0-1) ou None out_buf): # (B,H,W,3) uint8 """ Executa todo o pipeline para um batch dentro de stream_compute. Chamado dentro de um CUDA Graph depois do 1.º warm-up. """ B = frame_times_f32.shape[0] H, W = self.ctx.h, self.ctx.w # 1. replica background inter = cp.repeat(self.bg_f[None, ...], B, axis=0) # (B,H,W,3) float32 # 2. Aplica barra de progresso, se houver if bar_progress is not None: fill_w = (bar_progress * W).astype(cp.uint16) # (B,) # expand grade X uma vez, usa broadcast X = self.ctx.xg[None, :, :] # (1,H,W) bar_fill = (self.ctx.bar_mask_full[None, :, :] & (X < fill_w[:, None, None])) bar_bg = self.ctx.bar_mask_full[None, :, :] & (~bar_fill) inter[bar_bg , :] = self.ctx.bar_bg_rgbf inter[bar_fill, :] = self.ctx.hl_rgbf # 3. Sílabas já concluídas (máscara constante por linha) if completed_line_mask_bool is not None: inter[completed_line_mask_bool, :] = self.ctx.hl_rgbf # 4. Sílabas ativas (apenas da linha ativa) s0, s1 = active_syl_span # [s0, s1) if s1 > s0: X, Y = self.ctx.xg, self.ctx.yg mask_t = text_mask_bool # bool (H,W) for k in range(s0, s1): if syl_meta["line_idx"][k] != active_line_idx: # pula linha não ativa continue dur = syl_meta["vis_dur"][k] cut_x = syl_meta["x"][k] + (cp.clip( (frame_times_f32 - syl_meta["start"][k]) / dur, 0, 1) * syl_meta["w"][k]) # ElementwiseKernel aplica in-place nos 3 canais self._hl_kernel(cut_x, mask_t, X, Y, syl_meta["x"][k], syl_meta["y"][k], syl_meta["w"][k], syl_meta["h"][k], *self.ctx.hl_rgbf, *self.ctx.base_rgbf, inter[..., 2], # BGR order inter[..., 1], inter[..., 0], size=inter.size//3) # 5. Converte para uint8 out_buf[:] = (inter * 255).astype(cp.uint8) # ----------------------------------------------------------------------- def process_frames_streaming(self, # chamada pelo criador de vídeo base_cp, mask_cp, syl_info, active_indices, video_writer, video_lock, fps, first_frame_idx, n_frames, active_line_idx, completed_lines_set, bar_pauses): """ Pacote extremamente condensado em relação ao original: calcula todos os arrays antes do loop de batches; depois itera (pot. dentro de CUDA Graph). """ if n_frames == 0: return BATCH = max(self.cfg["frames_per_batch"], 64) max_batch = min(BATCH, 1024) # ---------- Pré-cálculo de dados constantes --------------- text_mask_bool = (mask_cp > 128) cmpl_mask = None if completed_lines_set: cmpl_mask = cp.zeros_like(text_mask_bool) for li in completed_lines_set: for (s,_,_,_,_,_,line_idx,_) in syl_info: if line_idx == li: cmpl_mask |= text_mask_bool # todas as sílabas da linha break # metadados das sílabas em cp arrays if syl_info: syl_meta = { "start" : cp.asarray([s[0] for s in syl_info], dtype=cp.float32), "end" : cp.asarray([s[1] for s in syl_info], dtype=cp.float32), "x" : cp.asarray([s[2] for s in syl_info], dtype=cp.uint16), "y" : cp.asarray([s[3] for s in syl_info], dtype=cp.uint16), "w" : cp.asarray([s[4] for s in syl_info], dtype=cp.uint16), "h" : cp.asarray([s[5] for s in syl_info], dtype=cp.uint16), "line_idx": cp.asarray([s[6] for s in syl_info], dtype=cp.int16), } syl_meta["vis_dur"] = cp.clip( cp.minimum( # heurística → [dur, max_vis] syl_meta["end"] - syl_meta["start"], self.max_vis), self.min_dur, None).astype(cp.float32) else: syl_meta = None # ---------- Execução por batches --------------------------- self.ctx.ensure_batch_buffers(max_batch) outA, outB = self.ctx.out_a, self.ctx.out_b pinned = self.ctx.alloc_pinned(outA.nbytes // 2) # metade do buffer # (warm-up + capture) warm = self.cfg["cuda_graph_warmup_frames"] graph_captured = False frame_idx_global = first_frame_idx while frame_idx_global < first_frame_idx + n_frames: batch_sz = min(max_batch, first_frame_idx + n_frames - frame_idx_global) # índices e tempos do batch fid = cp.arange(frame_idx_global, frame_idx_global + batch_sz, dtype=cp.int32) ft = fid.astype(cp.float32) / fps # barra de progresso? bar_prog = None if bar_pauses: # pause atual? p = next((p for p in bar_pauses if (ft[0] >= p["start"] and ft[0] < p["end"])), None) if p: bar_prog = (ft - p["start"]) / p["duration"] bar_prog = cp.clip(bar_prog, 0, 1).astype(cp.float32) # buffer ping-pong buf = outA[:batch_sz] if (frame_idx_global//batch_sz)%2==0 else outB[:batch_sz] # ---------------- GPU ----------------- with self.stream_compute: self._render_batch(ft, text_mask_bool, syl_meta, active_indices, active_line_idx, cmpl_mask, bar_prog, buf) # graph capture (depois do warm-up) if not graph_captured and frame_idx_global >= first_frame_idx + warm: g = cp.cuda.Graph() with g.capture(): self._render_batch(ft, text_mask_bool, syl_meta, active_indices, active_line_idx, cmpl_mask, bar_prog, buf) self.graph = g.instantiate() graph_captured = True elif graph_captured: self.graph.launch(self.stream_compute) # ---------------------------------------- self.stream_compute.synchronize() # ---------- cópia H→FFmpeg ----------- view_bgr = buf[..., ::-1] # GPU BGR # cópia assíncrona para pinned host cp.cuda.runtime.memcpyAsync(pinned.ptr, view_bgr.data.ptr, view_bgr.nbytes, cp.cuda.runtime.memcpyDeviceToHost, self.stream_h2d.ptr) self.stream_h2d.synchronize() # envia p/ FFmpeg with video_lock: video_writer.write(np.frombuffer(pinned, dtype=np.uint8) .reshape(batch_sz, self.ctx.h, self.ctx.w, 3)) frame_idx_global += batch_sz # --------------------------------------------------------------------------- # ===================== OUTROS COMPONENTES ================================ # * FFmpegWriter mantém a mesma interface; adicionado um parâmetro opcional # 'gpu_ctx' para reutilizar pinned memory (não obrigatório). # * KaraokeVideoCreator agora instancia GPURenderContext e passa # ao CUDAProcessor. # # Toda lógica de alto nível (TextRenderer, SubtitleProcessor etc.) permanece # **funcionalmente** igual, mas pontos onde era recriado `x_coords`, `y_coords`, # `text_mask_bool`, etc. foram trocados para usar o que já existe no # GPURenderContext. # # Para economizar espaço, veja a seção “Blocos inalterados” no final. # --------------------------------------------------------------------------- # ============================= MAIN ====================================== def main(): cfg = DEFAULT_CONFIG.copy() # --- Inicialização GPU & afinidade CPU (idem original) --- try: cp.cuda.Device(0).use() except cp.cuda.runtime.CUDARuntimeError: print(">> CUDA não encontrado.") return try: psutil.Process().cpu_affinity(list(range(os.cpu_count()))) except Exception: pass # --- Renderização de texto, legendas etc. (igual) --- tr = TextRenderer(cfg) syl_dict, not_found = load_syllables() sub_proc = SubtitleProcessor(tr, cfg, syl_dict, not_found) lines, pauses = sub_proc.process_subtitles_to_syllable_lines( cfg["default_subtitle_file"], int(cfg["video_resolution"].split('x')[0])) creator = KaraokeVideoCreator(cfg, tr) # cria contexto GPU compartilhado gpu_ctx = GPURenderContext(creator.width, creator.height, cfg) creator.cuda_processor = CUDAProcessor(cfg, creator.static_bg_frame_rgb_cp, gpu_ctx) creator.create_video(lines, pauses, cfg["default_output_file"], "audio.wav") if __name__ == "__main__": main() # ------------------------- BLOCOS INALTERADOS ---------------------------- # • TextRenderer (original completo) # • SubtitleProcessor (original completo) # • load_syllables, get_audio_duration (originais) # • FFmpegWriter (adicionar parâmetro opcional 'gpu_ctx' se quiser compartilhar # pinned memory, mas interface write() permanece) # • KaraokeVideoCreator: apenas trocado: # self.cuda_processor = CUDAProcessor(cfg, ..., gpu_ctx) # e, dentro de create_video(), todas as chamadas para # self.cuda_processor.process_frames_streaming(...) # agora incluem as infos extra ('gpu_ctx' já está embutido). # ---------------------------------------------------------------------------
Sinta-se livre para ajustar:
pythonDEFAULT_CONFIG["cuda_graph_warmup_frames"] = 0 # se a 1ª captura já for estável DEFAULT_CONFIG["frames_per_batch"] = 256 # se tiver VRAM sobrando
Observação A transcodificação/
ffmpeg
continua fora do nosso controle de GPU.
Para bottlenecks aí, use-c:v h264_nvenc -preset p5 -rc:v vbr_hq -b:v 18M
ou encode após gerar os frames em forma bruta, se I/O for o limitante.
Pronto! Com estas heurísticas avançadas e uso intensivo dos recursos de GPU, a geração de frames fica múltiplas vezes mais rápida sem alterar o visual final.
erando 35 frames estáticos iniciais...
Gerando vídeo: 1%|█ | 35/3127 [00:00<00:45, 68.58frames/s]
Erro Crítico durante a criação do vídeo:
UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f538' in position 1962: character maps to <undefined>
Traceback (most recent call last):
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 1642, in main
karaoke_creator.create_video(lines, long_pauses, output_file, audio_file)
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 1411, in create_video
self.cuda_processor.process_frames_streaming(
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 885, in process_frames_streaming
self._render_batch(batch_frame_times, text_mask_bool, syl_meta,
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 693, in _render_batch
self._hl_kernel(current_syl_cut_x_batch[f].astype(cp.float32), # 🔸 ESCALAR cut_x para frame f
File "cupy\_core\_kernel.pyx", line 920, in cupy._core._kernel.ElementwiseKernel.call
File "cupy\_core\_kernel.pyx", line 945, in cupy._core._kernel.ElementwiseKernel._get_elementwise_kernel
File "cupy\_util.pyx", line 64, in cupy._util.memoize.decorator.ret
File "cupy\_core\_kernel.pyx", line 728, in cupy._core._kernel._get_elementwise_kernel
File "cupy\_core\_kernel.pyx", line 82, in cupy._core._kernel._get_simple_elementwise_kernel_from_code
File "cupy\_core\core.pyx", line 2330, in cupy._core.core.compile_with_cache
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 542, in _compile_module_with_cache
return _compile_with_cache_cuda(
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 622, in _compile_with_cache_cuda
ptx, mapping = compile_using_nvrtc(
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 375, in compile_using_nvrtc
cu_file.write(source)
File "C:\Program Files\Python39\lib\encodings\cp1252.py", line 19, in encode
return codecs.charmap_encode(input,self.errors,encoding_table)[0]
UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f538' in position 1962: character maps to <undefined>
O processo foi interrompido devido a um erro.
Gerando vídeo: 1%|█ | 35/3127 [00:01<01:32, 33.29frames/s]
Recursos da GPU liberados.
Finalizando Karaokê Creator. Tempo total do script: 00:00:01
O problema não vem do CUDA nem do CuPy propriamente, e sim de um caractere-emoji (🔸) que entrou num comentário da nova versão:
pythonself._hl_kernel(current_syl_cut_x_batch[f].astype(cp.float32), # 🔸 ESCALAR cut_x …
Quando o CuPy grava o código‐fonte do kernel temporariamente em disco ele usa, no Windows, a codificação padrão do sistema (CP-1252). Essa codificação não conhece o emoji, daí o:
UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f538'
Remova qualquer caractere fora do ASCII dos kernels (e das linhas onde eles são chamados).
Basta substituir “🔸” por algo simples, por exemplo:
python# ESCALAR cut_x para frame f
Faça isso em todos os pontos do arquivo (provavelmente só existe esse).
(Opcional) se quiser blindar o script contra erros similares no futuro, force o Python a usar UTF-8 ao escrever arquivos temporários:
pythonimport os, locale os.environ["PYTHONUTF8"] = "1" # antes de importar cupy locale.setlocale(locale.LC_ALL, "C.UTF-8")
Mas, para o caso atual, apenas retirar o emoji já resolve.
python# ... # Para cada sílaba da linha ativa for k in range(s0, s1): if syl_meta["line_idx"][k] != active_line_idx: continue dur = syl_meta["vis_dur"][k] cut_x = syl_meta["x"][k] + ( cp.clip((frame_times_f32 - syl_meta["start"][k]) / dur, 0, 1) * syl_meta["w"][k] ) # ESCALAR cut_x para o frame atual (sem emoji!) self._hl_kernel( cut_x.astype(cp.float32), mask_t, X, Y, syl_meta["x"][k], syl_meta["y"][k], syl_meta["w"][k], syl_meta["h"][k], *self.ctx.hl_rgbf, *self.ctx.base_rgbf, inter[..., 2], inter[..., 1], inter[..., 0], size=inter.size // 3, ) # ...
Depois de remover o caractere, salve o arquivo e execute de novo: o CuPy conseguirá compilar o kernel e o processo seguirá normalmente – agora sem o gargalo do Unicode.
Gerando vídeo: 1%|█ | 35/3127 [00:00<00:49, 62.70frames/s]
Erro Crítico durante a criação do vídeo:
UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f538' in position 1962: character maps to <undefined>
Traceback (most recent call last):
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 1642, in main
karaoke_creator.create_video(lines, long_pauses, output_file, audio_file)
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 1411, in create_video
self.cuda_processor.process_frames_streaming(
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 885, in process_frames_streaming
self._render_batch(batch_frame_times, text_mask_bool, syl_meta,
File "C:\Users\lucas\Downloads\Scripts\Karaoke-Creator\playdasegunda\PS_bak_new_clean-base.py", line 693, in _render_batch
self._hl_kernel(current_syl_cut_x_batch[f].astype(cp.float32), # ESCALAR cut_x para frame f
File "cupy\_core\_kernel.pyx", line 920, in cupy._core._kernel.ElementwiseKernel.call
File "cupy\_core\_kernel.pyx", line 945, in cupy._core._kernel.ElementwiseKernel._get_elementwise_kernel
File "cupy\_util.pyx", line 64, in cupy._util.memoize.decorator.ret
File "cupy\_core\_kernel.pyx", line 728, in cupy._core._kernel._get_elementwise_kernel
File "cupy\_core\_kernel.pyx", line 82, in cupy._core._kernel._get_simple_elementwise_kernel_from_code
File "cupy\_core\core.pyx", line 2330, in cupy._core.core.compile_with_cache
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 542, in _compile_module_with_cache
return _compile_with_cache_cuda(
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 622, in _compile_with_cache_cuda
ptx, mapping = compile_using_nvrtc(
File "C:\Users\lucas\AppData\Roaming\Python\Python39\site-packages\cupy\cuda\compiler.py", line 375, in compile_using_nvrtc
cu_file.write(source)
File "C:\Program Files\Python39\lib\encodings\cp1252.py", line 19, in encode
return codecs.charmap_encode(input,self.errors,encoding_table)[0]
UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f538' in position 1962: character maps to <undefined>
O processo foi interrompido devido a um erro.
Gerando vídeo: 1%|█ | 35/3127 [00:00<01:21, 37.90frames/s]
Recursos da GPU liberados.
Finalizando Karaokê Creator. Tempo total do script: 00:00:01
O erro não vem mais do ponto em que você chama o kernel – ele acontece quando o CuPy tenta gravar o arquivo-fonte “.cu” que depois será entregue ao NVRTC.
No Windows, open(..., 'w')
usa a codificação padrão do sistema (normalmente CP-1252). Se aparecer qualquer caractere que a CP-1252 não conhece (emoji, seta longa, etc.) dentro da string do kernel, o Python lança o UnicodeEncodeError
.
Procure por tudo que não seja ASCII simples dentro da string que você passa a cp.ElementwiseKernel
. Algo como:
pythonoperation = r""" bool inside = (X >= syl_x) && (X < syl_x + syl_w) && (Y >= syl_y) && (Y < syl_y + syl_h) && mask_t; ... """
[a-zA-Z0-9_{}();<>=+-*/&| ]
.Dica rápida
Abra o arquivo no VS Code, ative “Regex” em Pesquisar e use
[\u0080-\uFFFF]
como expressão.
Tudo que aparecer precisa sair ou virar ASCII.
Se você não quer ficar caçando caracteres toda vez, acrescente antes de importar cupy
:
python# --- início do arquivo principal --------------------------------- import os, builtins # 2a) diz ao Python para assumir UTF-8 por padrão (equivale a -X utf8) os.environ["PYTHONUTF8"] = "1" # 2b) monkey-patch: qualquer arquivo *.cu* será aberto com UTF-8 _open_orig = builtins.open def open_utf8(path, mode='r', *args, **kw): if isinstance(path, str) and path.endswith('.cu') \ and 'b' not in mode and 'encoding' not in kw: kw['encoding'] = 'utf-8' return _open_orig(path, mode, *args, **kw) builtins.open = open_utf8 # ----------------------------------------------------------------- import cupy as cp # resto dos imports...
.cu
que o próprio CuPy cria, sem afetar o resto do script.powershellset PYTHONUTF8=1 python karaoke_fast.py
ou (Python 3.9+)
powershellpython -X utf8 karaoke_fast.py
Depois dessas mudanças o CuPy gravará os kernels em UTF-8, o UnicodeEncodeError
desaparece e o vídeo volta a ser gerado normalmente – agora na velocidade turbo que você queria.