Loopy Implementation Analysis
Criado em: 3 de outubro de 2024
Respondido usando O1 Preview por Chat01
Criado em: 3 de outubro de 2024
Respondido usando O1 Preview por Chat01
下面是loopy论文和相应的audio_alignment 和 data loader模块的实现,请全面分析模块代码,看是否有实现不合理或错误的地方,请给出最正确最优的实现。
In this section, we will introduce our method, Loopy. First, we will provide an overview of the framework, including the input, output, and key components of Loopy. Second, we will focus on the design of the inter-/intra-temporal modules, including the temporal segment module. Third, we will detail the implementation of the audio condition module. Finally, we will describe the implementation details during the training and testing of Loopy.
Figure 2: The framework of Loopy. it removes the commonly used face locator and speed layer modules found in existing methods. Instead, it achieves flexible and natural motion generation through the proposed inter/intra-clip temporal layer and audio-to-latents module.
Our method is built upon Stable Diffusion (SD) and uses its initialization weights. SD is a textto-image diffusion model based on the Latent Diffusion Model (LDM) (Rombach et al., 2022). It employs a pretrained VQ-VAE (Kingma, 2013; Van Den Oord et al., 2017) E to transform images from pixel space to latent space. During training, images are first converted to latents, i.e., z0 = E(I).
Gaussian noise ϵ is then added to the latents in the latent space based on the Denoising Diffusion Probabilistic Model (DDPM) (Ho et al., 2020) for t steps, resulting in a noisy latent zt. The denoising net takes zt as input to predict ϵ. The training objective can be formulated as follows:
, (1)
where ϵθ represents the denoising net, including condition-related attention modules, which is the main part Loopy aims to improve. c represents the text condition embedding in SD, and in Loopy, it includes audio, motion frames, and other additional information influencing the final generation.
During testing, the final image is obtained by sampling from Gaussian noise and removing the noise based on DDIM (Song et al., 2020) or DDPM.
As demonstrated in Figure 2, in the Loopy, the inputs to the denoising net include noisy latents, i.e., the VQ-VAE encoded image latents zt. Unlike original SD, where the input is a single image, here it is a sequence of images representing a video clip. The inputs also include reference latents cref (encoded reference image latents via VQ-VAE), audio embedding caudio (audio features of the current clip), motion frames cmf (image latents of the M frames sequence from the preceding clips), and timestep t. During training, additional facial movement-related features are involved:
clmk (facial keypoint sequence of the current clip), cmov (head motion variance of the current clip), and cexp (expression variance of the current clip). The output is the predicted noise ϵ. The denoising network employs a Dual U-Net architecture (Hu, 2024; Zhu et al., 2023). This architecture includes an additional reference net module, which replicates the original SD U-Net structure but utilizes reference latents cref as input. The reference net operates concurrently with the denoising U-Net.
During the spatial attention layer computation in the denoising U-Net, the key and value features from corresponding positions in the reference net are concatenated with the denoising U-Net's features along the token dimension before proceeding with the attention module computation. This design enables the denoising U-Net to effectively incorporate reference image features from the reference net.
Additionally, the reference net also takes motion frames latents cmf as input for feature extraction, allowing these features to be utilized in subsequent temporal attention computations.
Here, we introduce the design of the proposed inter-/intra-clip temporal modules. Unlike existing methods (Tian et al., 2024; Xu et al., 2024a; Chen et al., 2024; Wang et al., 2024) that process motion frame latents and noisy latents features simultaneously through a single temporal layer, Loopy employs two temporal attention layers: the inter-clip temporal layer and the intra-clip temporal layer.
The inter-clip temporal layer first handles the cross-clip temporal relationships between motion frame latents and noisy latents, while the intra-clip temporal layer focuses on the temporal relationships within the noisy latents of the current clip.
First, we introduce the inter-clip temporal layer, initially ignoring the temporal segment module in Figure 2. We first collect the m image latents from the preceding clip, referred to as motion frames latents cmf . Similar to cref , these latents are processed frame-by-frame through the reference network for feature extraction. Within each residual block, the features xcmf obtained from the reference network are concatenated with the features x from the denoising U-Net along the temporal dimension. To distinguish the types of latents, we add learnable temporal embeddings. Subsequently, self-attention is computed on the concatenated tokens along the temporal dimension, referred to as temporal attention. The intra-clip temporal layer differs in that its input does not include features from motion frames latents, it only processes features from the noisy latents of the current clip. By separating the dual temporal layers, the model can better handle the aggregation of different semantic temporal features in cross-clip temporal relationships.
Figure 3: The illustration of the temporal segment module
and the inter/intra- clip temporal layers. The former allows us to expand the motion frame to cover over 100 frames, while the later enables the modeling of long-term motion dependency.
Due to the independent design of the inter-clip temporal layer, Loopy is better equipped to model the motion relationships among clips. To enhance this capability, we introduce the temporal segment module before cmf enters the reference network. This module not only extends the temporal range covered by the inter-clip temporal layer but also accounts for the variations in information due to the varying distances of different clips from the current clip, as illustrated in Figure 3. The temporal segment module divides the original motion frame into multiple segments and extracts representative motion frames from each segment to abstract the segment. Based on these abstracted motion frames, we recombine them to obtain motion frame latents for subsequent computations. For the segmentation process, we define two hyperparameters: stride s and expand ratio r. The stride s represents the number of abstract motion frames in each segment, while the expand ratio r is used to calculate the length of the original motion frames in each segment. The number of frames in the i-th segment, ordered from closest to farthest from the current clip, is given by s × r
(i−1). For example, with a stride s = 4 and an expand ratio r = 2, the first segment would contain 4 frames, the second segment would contain 8 frames, and the third segment would contain 16 frames. For the abstraction process after segmentation, we default to uniform sampling within each segment. In the experimental section, we investigate different segmentation parameters and abstraction methods. Since segmentation and abstraction directly affect the learning of long-term motion dependency, different methods have a significant impact on the results. The output of the temporal segment module, c o mf , can be defined as:
where start_indexi =Pk−1 j=0 r j· s and end_indexi = start_indexi + r k· s, with k =is
. c o mf,i represents the mean value of the i-th element of the output, calculated from the sequence cmf .
The temporal segment module rapidly expands the temporal coverage of the motion frames input to the inter-clip temporal layer while maintaining acceptable computational complexity. For closer frames, a lower expansion rate retains more details, while for distant frames, a higher expansion rate covers a longer duration. This approach helps the model better capture motion style from long-term information and generate temporally natural motion without spatial templates.
For the audio condition, we first use wav2vec (Baevski et al., 2020; Schneider et al., 2019) for audio feature extraction. Following the approach in EMO, we concatenate the hidden states from each layer of the wav2vec network to obtain multi-scale audio features. For each video frame, we concatenate the audio features of the two preceding and two succeeding frames, resulting in a 5-frame audio feature as audio embeddings caudio for the current frame. Initially, in each residual block, we use cross-attention with noisy latents as the query and audio embedding caudio as the key and value to compute an attended audio feature. This attended audio feature is then added to the noisy latents feature obtained from self-attention, resulting in a new noisy latents feature. This provides a preliminary audio condition.
Additionally, as illustrated in Figure 4, we introduce the Audio-to-Latents module. This module maps conditions that have both strong and weak correlations with portrait motion, such as audio, to a shared motion latents space. These mapped conditions serve as the final condition features, thereby enhancing the modeling of the relationship between audio and portrait movements based on motion latents. Specifically, we maintain a set of learnable embeddings. For each input condition, we map it to a query feature using a fully connected (FC) layer, while the learnable embeddings serve as key and value features for attention computation to obtain a new feature based on the learnable embeddings. These new features, referred to as motion latents, replace the input condition in subsequent computations. These motion latents are then dimensionally transformed via an FC layer and added to the timestep embedding features for subsequent network computation.
During training, we sample an input condition for the Audio-to-Latents module with equal probability from audio embeddings and facial movement-related features such as landmarks, face absolute motion variance, and face expression motion variance. During testing, we only input audio to generate motion latents. By leveraging features strongly correlated with portrait movement, the model utilizes learnable embeddings to control motion. Consequently, transforming audio embeddings into motion latents can also allows for more direct influence on portrait motion.
Figure 4: The audio-to-latents module.
Conditions Mask and Dropout. In the Loopy framework, various conditions are involved, including the reference image cref , audio features caudio, preceding frame motion frames cmf , and motion latents cml representing audio and facial motion conditions. Due to the overlapping nature of the information contained within these conditions, to better learn the unique information specific to each condition, we used distinct masking strategies for the conditions during the training process. During training, caudio and motion latents are masked to all-zero features with a 10% probability. For cref and cmf , we design specific dropout and mask strategies due to their conflicting relationship. cmf also provides appearance information and is closer to the current clip compared to cref , leading the model to heavily rely on motion frames rather than the reference image. This can cause color shifts and artifacts in long video sequences during inference. To address this, cref has a 15% probability of being dropped, meaning the denoising U-Net will not concatenate features from the reference network during self-attention computation. When cref is dropped, motion frames are also dropped, meaning the denoising U-Net will not concatenate features from the reference network during temporal attention computation. Additionally, motion frames have an independent 40% probability of being masked to all-zero features.
Multistage Training. Following AnimateAnyone (Hu, 2024) and EMO (Tian et al., 2024), we employ a 2-stage training process. In the first stage, the model is trained without temporal layers and the audio condition module. The inputs to the model are the noisy latents of the target single-frame image and reference image latents, focusing the model on an image-level pose variations task. After completing the first stage, we proceed to the second stage, where the model is initialized with the reference network and denoising U-Net from the first stage. We then add the inter-/intra-clip temporal layers and the audio condition module for full training to obtain the final model. Inference. During Inference, we perform class-free guidance (Ho & Salimans, 2022) using multiple conditions. Specifically, we conduct three inference runs, differing in whether certain conditions are dropped. The final noise ef inal is computed as:
ef inal = audio_ratio × (eaudio − eref ) + ref_ratio × (eref − ebase) + ebase where eaudio includes all conditions cref , caudio, cmf , cml, eref but mask caudio to all-zero features, and ebase masks caudio to all-zero features and removes the concatenation of reference network features in the denoising U-Net's self-attention. This approach allows us to control the model's final output in terms of adherence to the reference image and alignment with the audio. The audio ratio is set to 5 and the reference ratio is set to 3. We use DDIM sampling with 25 denoising steps to complete the inference.
Datasets. For training Data, We collected talking head video data from the internet, excluding videos with low lip sync scores, excessive head movement, extreme rotation, or incomplete head exposure.
This resulted in 160 hours of cleaned training data. Additionally, we supplemented the training data with public datasets such as HDTF (Zhang et al., 2021). For the test set, we randomly sampled 100 videos from CelebV-HQ (Zhu et al., 2022) (a public high-quality celebrity video dataset with mixed scenes) and RAVDESS (Kaggle) (a public high-definition indoor talking scene dataset with rich emotions). To test the generalization ability of diffusion-based models, we also collected 20 portrait test images, including real people, anime, side face, and humanoid crafts of different materials, along with 20 audio clips, including speeches, singing, rap and emotionally rich speech. We refer to this test set as the openset test set. Implementation Details. We trained our model using 24 Nvidia A100 GPUs with a batch size of 24, using the AdamW (Loshchilov & Hutter, 2017) optimizer with a learning rate of 1e-5. The generated video length was set to 12 frames, and the motion frame length was set to 124 frames, representing the preceding 124 frames of the current 12-frame video. After temporal squeezing, this was compressed to 20 motion frame latents. During training, the reference image was randomly selected from a frame within the video clip. For the facial motion information required to train audo-to-motion module, we used DWPose (Yang et al., 2023) to detect facial keypoints for the current 12 frames. The variance of the absolute position of the nose tip across these 12 frames was used as the absolute head motion variance. The variance of the displacement of the upper half of the facial keypoints (37 keypoints)
relative to the nose tip across these 12 frames was used as the expression variance. The training videos were uniformly processed at 25 fps and cropped to 512x512 portrait videos.
Metrics. We assess image quality with the IQA metric (Wu et al., 2023), video motion with VBench's smooth metric (Huang et al., 2024), and audio-visual synchronization using SyncC and SyncD (Prajwal et al., 2020). For the CelebvHQ and RAVDESS test sets, which have corresponding ground truth videos, we will also compute FVD (Unterthiner et al., 2019), E-FID (Tian et al., 2024)
and FID metrics for comparison. Additionally, to compare the global motion (denoted as Glo) and dynamic expressions (denoted as Exp) of the portrait, we calculated the variance values based on key points of the nose and the upper face, specifically excluding the mouth area. We also computed the values for the ground truth video as a reference for comparison. For the openset testset, which lacks ground truth video references, we conducted subjective evaluations. Ten experienced users were invited to assess six key dimensions: identity consistency, video synthesis quality, audio-emotion
import os
import glob
import random
from typing import List, Tuple, Dict
import torch
from torch.utils.data import Dataset, DataLoader
import torchaudio
import cv2
import numpy as np
import mediapipe as mp
from diffusers import AutoencoderKL, DDPMScheduler
from audio_alignment import AudioAligner
def extract_facial_keypoints(image: np.ndarray) -> np.ndarray:
"""
使用 MediaPipe 模块提取面部关键点。
text参数: image (np.ndarray): 输入的图像,格式为 NumPy 数组。 返回: np.ndarray: 提取的面部关键点坐标。 """ mp_face_mesh = mp.solutions.face_mesh with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True) as face_mesh: results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) if not results.multi_face_landmarks: return np.zeros((68, 2)) face_landmarks = results.multi_face_landmarks[0] keypoints = [] for lm in face_landmarks.landmark[:68]: x = lm.x * image.shape[1] y = lm.y * image.shape[0] keypoints.append([x, y]) return np.array(keypoints)
class LoopyDataset(Dataset):
"""
Loopy 项目的自定义数据集类,用于加载和预处理视频和音频数据。
1. 使用VQ-VAE编码器将输入图像序列转换为潜在空间表示。
2. 在潜在空间中添加高斯噪声,生成有噪声的潜在表示。
"""
def init(self, video_paths: List[str],
transform=None,
audio_transform=None,
frame_rate: int =25,
frame_size: Tuple[int, int] = (512,512),
audio_sample_rate: int = 16000,
vqvae_config: Dict = None):
"""
初始化数据集。
text参数: video_paths (List[str]): 视频文件的路径列表。 transform (callable, optional): 应用于视频帧的转换函数。 audio_transform (callable, optional): 应用于音频的转换函数。 frame_rate (int, optional):需要提取的帧率,默认为25 fps。 frame_size (Tuple[int, int], optional): 提取帧尺寸,默认为512x512 audio_sample_rate (int, optional): 音频采样率,默认为16kHz。 """ self.video_paths = video_paths self.transform = transform self.audio_transform = audio_transform self.frame_rate = frame_rate self.frame_size = frame_size self.audio_sample_rate = audio_sample_rate self.video_length = 12 # 生成的视频长度为12帧 self.motion_frame_length = 124 # 动作帧长度为124帧 self.device = torch.device(device if torch.cuda.is_available() else 'cpu') self.aligner = AudioAligner(device=self.device) # {{ edit_4 }} # 初始化VQ-VAE编码器(使用Stable Diffusion的VQ-VAE) self.vq_vae_encoder = self._init_vq_vae_encoder(vqvae_config).to(self.device) # 确保编码器被移动到设备 # 初始化 DDPM 调度器 self.ddpm_scheduler = DDPMScheduler( num_train_timesteps=1000, # 训练时间步数 beta_start=0.0001, # 噪声方差的起始值 beta_end=0.02, # 噪声方差的结束值 beta_schedule="linear", # 噪声方差的调度方式 ) def __len__(self) -> int: """ 返回数集中样本的数量。 返回: int: 样本数量。 """ return len(self.video_paths) def __getitem__(self, idx: int) -> Dict: """ 根据索引获取一个样本,包括视频帧、音频以及相关的特征。 参数: idx (int): 样本的索引。 返回: Dict: 包含视频帧、音频、面部关键点等信息的字典。 """ video_path = self.video_paths[idx] # 加载视频帧 frames = self.load_video_frames(video_path) # 加载音频 audio = self.load_audio(video_path) # 确保视频帧数量足够 total_required_frames = self.motion_frame_length + self.video_length if len(frames) < total_required_frames: raise ValueError(f"视频 {video_path} 的帧数不足,至少需要 {total_required_frames} 帧。") # 获取当前12帧视频 current_frames = frames[self.motion_frame_length:self.motion_frame_length + self.video_length] # 获取前124帧作为动作帧 motion_frames = frames[:self.motion_frame_length] # 随机选择参考图像 reference_idx = random.randint(0, self.video_length - 1) reference_frame = current_frames[reference_idx] # 提取面部关键点 facial_keypoints = self.extract_keypoints_from_frames(current_frames) # 初始化音频对齐工具 # aligner = AudioAligner() # {{ edit_5 }} 移除此行 frame_count = len(frames) # 获取视频帧的数量 audio_embeddings, audio_segments = self.aligner.get_concatenated_audio_embeddings( audio_waveform=audio.numpy(), # {{ edit_12 }} 将Tensor转换为NumPy数组 frame_rate=self.frame_rate, sampling_rate=self.audio_sample_rate, num_frames=frame_count # 传递视频帧数量 ) # 提取当前12帧的音频嵌入 c_audio c_audio_embeddings = audio_embeddings[self.motion_frame_length:self.motion_frame_length + self.video_length] # VQ-VAE 编码当前帧 with torch.no_grad(): z0 = self.vq_vae_encoder.encode(current_frames) # 编码当前12帧 cmf = self.vq_vae_encoder.encode(motion_frames) # 编码动作帧124帧 cref = self.vq_vae_encoder.encode(reference_frame.unsqueeze(0)) # 编码参考帧,添加批次维度 # 添加噪声 # 随机选择时间步数 timesteps = torch.randint(1, self.ddpm_scheduler.config.num_train_timesteps, (z0.size(0),)).long() zt, noise = self.add_noise(z0, timesteps) # (B, 3, H', W') sample = { 'z0': z0, # 当前12帧的潜在表示 'zt': zt, # 有噪声的潜在表示 'cmf': cmf, # 动作帧124帧的潜在表示 'cref': cref, # 参考帧的潜在表示 'caudio': c_audio_embeddings, # 音频特征 'sample_rate': self.audio_sample_rate, # 音频采样率 'facial_keypoints': facial_keypoints, # 面部关键点 } if self.transform: sample['frames'] = self.transform(sample['frames']) if self.audio_transform: sample['audio'] = self.audio_transform(sample['audio']) return sample def load_video_frames(self, video_path: str) -> torch.Tensor: """ 从视频文件中提取帧。 参数: video_path (str): 视频文件的路径。 返回: torch.Tensor:处理后的帧,形状为 (num_frames,3, H, W)。 """ # 检查视频文件是否存在 if not os.path.exists(video_path): raise FileNotFoundError(f"视频文件 {video_path} 不存在。") # 提取视频帧 print("开始提取视频帧...") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: raise ValueError("无法获取视频的帧率。") frame_interval = max(int(round(fps / self.frame_rate)),1) frames = [] frame_count =0 while True: ret, frame = cap.read() if not ret: cap.release() break if frame_count % frame_interval ==0: # 转换为RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 调整尺寸 frame_resized = cv2.resize(frame_rgb, self.frame_size) # 转换为Tensor并归一化到 [0,1] frame_tensor = torch.from_numpy(frame_resized).permute(2,0,1).float() /255.0 frames.append(frame_tensor) frame_count +=1 cap.release() if frames: frames = torch.stack(frames) # (num_frames,3, H, W) else: frames = torch.empty((0,3, self.frame_size[1], self.frame_size[0])) return frames def load_audio(self, video_path: str) -> torch.Tensor: """ 从视频文件中提取音频,并进行预处理。 参数: video_path (str): 视频文件的路径。 返回: Tuple[torch.Tensor, int]:预处理后的音频特征和采样率。 """ # 使用 torchaudio 从视频中提取音频 print("开始提取音频信号...") waveform, sample_rate = torchaudio.load(video_path) # 如果是立体声,则转换为单声道 if waveform.shape[0] >1: waveform = torch.mean(waveform, dim=0, keepdim=True) #重新采样到16kHz if sample_rate !=self.audio_sample_rate: resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=self.audio_sample_rate) waveform = resampler(waveform) return waveform # {{ edit_2 }} def extract_keypoints_from_frames(self, frames: torch.Tensor) -> torch.Tensor: """ 从视频帧中提取面部关键点。 参数: frames (torch.Tensor): 视频帧,形状为 (num_frames,3, H, W)。 返回: torch.Tensor: 面部关键点,形状为 (num_frames, num_keypoints,2)。 """ keypoints = [] for frame in frames: # 将Tensor转换为NumPy数组 frame_np = (frame.permute(1, 2, 0).numpy() * 255).astype(np.uint8) kp = extract_facial_keypoints(frame_np) # 假设返回形状为 (68,2) keypoints.append(kp) if keypoints: keypoints = np.array(keypoints) # {{ edit_10 }} 先转换为NumPy数组 keypoints = torch.from_numpy(keypoints).float() # {{ edit_11 }} 再转换为Tensor else: keypoints = torch.empty((0, 68, 2)) return keypoints def _init_vq_vae_encoder(self, vqvae_config): """ 初始化VQ-VAE编码器。 参数: vqvae_config (dict): VQ-VAE配置,包括预训练模型路径等。 返回: nn.Module: VQ-VAE编码器模块。 """ # 使用Stable Diffusion自带的VQ-VAE编码器 encoder = AutoencoderKL.from_pretrained( vqvae_config['pretrained_model_name'], subfolder="vae" ) # 冻结VQ-VAE编码器的参数,避免在训练中更新 for param in encoder.parameters(): param.requires_grad = False return encoder def add_noise(self, latents, timesteps): """ 根据 DDPM 潜在表示中添加高斯噪声。 参数: latents (torch.Tensor): 潜在表示,形状为 (B, latent_dim, H', W') timesteps (torch.Tensor): 时间步数,形状为 (B,) 返回: tuple: 加噪后的潜在表示和实际添加噪声 """ noise = torch.randn_like(latents).to(self.device) noisy_latents = self.ddpm_scheduler.add_noise(latents, noise, timesteps) return noisy_latents, noise
def get_video_paths(raw_data_dir: str) -> List[str]:
"""
获取原始数据目录下所有视频文件的路径。
text参数: raw_data_dir (str): 原始数据目录路径。 返回: List[str]: 视频文件路径列表。 """ video_extensions = ['**/*.mp4', '**/*.avi', '**/*.mov', '**/*.mkv'] # 修改为递归查找 video_paths = [] for ext in video_extensions: video_paths.extend(glob.glob(os.path.join(raw_data_dir, ext), recursive=True)) # 添加 recursive=True return video_paths
def create_dataloaders(config: Dict) -> Tuple[DataLoader, DataLoader]:
"""
根据配置创建训练和测试数据加载器。
text参数: config (Dict): 配置字典,包含数据目录、批次大小、转换函数等。 返回: Tuple[DataLoader, DataLoader]:训练和测试数据加载器。 """ raw_data_dir = config['raw_data_dir'] video_paths = get_video_paths(raw_data_dir) print(f"找到 {len(video_paths)} 个视频文件。") random.shuffle(video_paths) split_idx = int(len(video_paths) * config['train_split']) train_paths = video_paths[:split_idx] test_paths = video_paths[split_idx:] train_dataset = LoopyDataset( video_paths=train_paths, transform=config.get('train_transform', None), audio_transform=config.get('train_audio_transform', None), frame_rate=config.get('frame_rate',25), frame_size=config.get('frame_size', (512,512)), vqvae_config=config.get('vqvae_config', None), ) test_dataset = LoopyDataset( video_paths=test_paths, transform=config.get('test_transform', None), audio_transform=config.get('test_audio_transform', None), frame_rate=config.get('frame_rate',25), frame_size=config.get('frame_size', (512,512)), vqvae_config=config.get('vqvae_config', None), ) print("创建数据加载器...") print(f"训练数据集长度: {len(train_dataset)}") train_loader = DataLoader( train_dataset, batch_size=1, # {{ edit_6 }} 将批次大小从 2 减少到 1 以节省更多内存 shuffle=True, num_workers=0, # {{ edit_7 }} 将 worker 数量设置为 0 pin_memory=True ) test_loader = DataLoader( test_dataset, batch_size=1, # {{ edit_6 }} 将批次大小从 2 减少到 1 以节省更多内存 shuffle=False, num_workers=0, # {{ edit_7 }} 将 worker 数量设置为 0 pin_memory=True ) return train_loader, test_loader
if name == "main":
import torch.multiprocessing as torch_mp # {{ edit_8 }}
torch_mp.set_start_method('spawn', force=True) # {{ edit_9 }}
text# 测试数据加载模块 config = { 'raw_data_dir': './data/raw', 'train_split':0.8, 'batch_size':1, # {{ edit_6 }} 将批次大小从 2 减少到 1 以节省更多内存 'num_workers':0, # {{ edit_7 }} 将 worker 数量设置为 0 'frame_rate':25, 'frame_size': (512,512), 'audio_sample_rate': 16000, 'train_transform': None, # 可添加数据增强等转换 'train_audio_transform': None, 'test_transform': None, 'test_audio_transform': None, 'vqvae_config': { 'pretrained_model_name': 'runwayml/stable-diffusion-v1-5', }, } train_loader, test_loader = create_dataloaders(config) # 检查 video_paths 是否为空 if len(train_loader.dataset) == 0: raise ValueError("训练数据集为空,请检查视频文件路径和目录结构。") # 获取一个批次的数据进行测试 for batch in train_loader: print("训练批次数据:") print("z0的形状:", batch['z0'].shape) # 当前12帧的潜在表示 print("zt的形状:", batch['zt'].shape) # 有噪声的潜在表示 print("cmf的形状:", batch['cmf'].shape) # 动作帧124帧的潜在表示 print("cref的形状:", batch['cref'].shape) # 参考帧的潜在表示 print("caudio的形状:", batch['caudio'].shape) # 音频特征 print("采样率:", batch['sample_rate']) # 音频采样率 print("面部关键点的形状:", batch['facial_keypoints'].shape) # 面部关键点 break for batch in test_loader: print("测试批次数据:") print("z0的形状:", batch['z0'].shape) print("zt的形状:", batch['zt'].shape) print("cmf的形状:", batch['cmf'].shape) print("cref的形状:", batch['cref'].shape) print("caudio的形状:", batch['caudio'].shape) print("采样率:", batch['sample_rate']) print("面部关键点的形状:", batch['facial_keypoints'].shape) break
import numpy as np
import torch
from typing import List, Tuple, Any, Iterator
from transformers import Wav2Vec2Model, Wav2Vec2Processor, Wav2Vec2FeatureExtractor
import traceback
import psutil # {{ edit_7 }} 导入psutil库用于内存监控
import os
import math # {{ edit_18 }} 导入math库用于数学计算
def log_memory_usage(step: str):
"""
记录当前内存使用情况。
text参数: step (str): 当前步骤的描述。 """ process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"[Memory] 步骤: {step} - RSS: {mem_info.rss / (1024 ** 2):.2f} MB, VMS: {mem_info.vms / (1024 ** 2):.2f} MB")
class AudioAligner:
"""
音频对齐工具类
用于将音频信号与视频帧对齐,并提取拼接后的音频嵌入。
"""
textdef __init__(self, model_name: str = 'facebook/wav2vec2-base-960h', device: str = 'cpu'): # {{ edit_1 }} 将默认设备设置为 CPU """ 初始化音频对齐工具。 参数: model_name (str): 预训练Wav2Vec2模型的名称。 device (str): 运行设备,'cuda' 或 'cpu'。 """ self.device = device self.model = Wav2Vec2Model.from_pretrained(model_name).to(self.device) # {{ edit_6 }} 移除 output_hidden_states 参数 self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name) def extract_audio_features(self, waveform: torch.Tensor) -> torch.Tensor: """ 提取音频特征。 参数: waveform (torch.Tensor): 音频波形,形状为 (1, num_samples) 返回: torch.Tensor: 提取的特征 """ assert waveform.shape[0] == 1, "输入的音频波形须为单声道。" # 使用特征提取器并返回张量 inputs = self.feature_extractor(waveform.numpy().flatten(), sampling_rate=16000, return_tensors="pt") # {{ 修改: 展平特征数组以确保为1D input_values = inputs.input_values.to(self.device) # 将输入移至设备 with torch.no_grad(): # 确保不计算梯度以节省内存 outputs = self.model(input_values) hidden_state = outputs.last_hidden_state # 仅提取最后一层的隐藏状态 return hidden_state def get_concatenated_audio_embeddings( self, audio_waveform: np.ndarray, frame_rate: int = 25, sampling_rate: int = 16000, num_frames: int = None # 新增参数 ) -> Iterator[Tuple[torch.Tensor, torch.Tensor]]: """ 获取每个视频帧对应的拼接后音频嵌入和对齐后的音频片段。 参数: audio_waveform (np.ndarray): 原始音频波形。 frame_rate (int): 视频帧率,默认为25帧每秒。 sampling_rate (int): 采样率,默认为16000 Hz。 num_frames (int): 视频帧的数量,用于确保音频嵌入数量与视频帧数量一致。 返回: Iterator[Tuple[torch.Tensor, torch.Tensor]]: 拼接后的音频嵌入和对应音频片段的迭代器。 """ # {{ edit_22 }} 移除音频通道数的断言 # assert audio_waveform.ndim in [1, 2], "audio_waveform 必须是一维或二维数组。" # if audio_waveform.ndim == 2: # assert audio_waveform.shape[1] in [1, 2], "仅支持单声道或双声道音频。" frame_duration = 1.0 / frame_rate samples_per_frame = int(sampling_rate * frame_duration) if num_frames is None: num_frames = math.ceil(len(audio_waveform) / samples_per_frame) # {{ edit_18 }} 使用ceil确保覆盖所有样本 # 初始化缓冲区 prev_features = [torch.zeros(self.model.config.hidden_size, device=self.device) for _ in range(2)] log_memory_usage("初始化缓冲区") # {{ edit_8 }} 记录初始化缓冲区后的内存使用情况 # {{ edit_15 }} 修改批量提取音频特征以提高性能并修复运行时错误 segments = [] for t in range(num_frames): start = t * samples_per_frame end = start + samples_per_frame if start >= len(audio_waveform): segment = np.zeros(samples_per_frame, dtype=np.float32) print(f"[Debug] Frame {t}: start {start} >= len(audio_waveform), using zero padding.") else: segment = audio_waveform[start:end] if segment.ndim > 1 and segment.shape[1] > 1: segment = segment.mean(axis=1) print(f"[Debug] Frame {t}: Averaging across channels.") else: segment = segment.flatten() if len(segment) < samples_per_frame: segment = np.pad(segment, (0, samples_per_frame - len(segment)), 'constant') print(f"[Debug] Frame {t}: Padded with {samples_per_frame - len(segment)} zeros.") elif len(segment) > samples_per_frame: segment = segment[:samples_per_frame] print(f"[Debug] Frame {t}: Truncated to {samples_per_frame} samples.") segment = segment.astype(np.float32) segments.append(segment) # 检查所有segment长度是否一致 for i, seg in enumerate(segments): if len(seg) != samples_per_frame: raise ValueError(f"Segment {i} has incorrect length {len(seg)} (expected {samples_per_frame})") # {{ edit_20 }} 添加断言以确保所有片段长度一致 # 将列表转换为 NumPy 数组后再转换为张量,避免性能警告 segments_array = np.stack(segments) # {{ edit_21 }} 使用 stack 确保所有segment长度一致 waveform_batch = torch.from_numpy(segments_array).to(self.device) with torch.no_grad(): inputs = self.feature_extractor(waveform_batch.cpu().numpy(), sampling_rate=16000, return_tensors="pt", padding=True) # {{ edit_16 }} 直接将 input_values 移动至设备,避免不必要的转换 input_values = inputs.input_values.to(self.device) outputs = self.model(input_values) hidden_states = outputs.last_hidden_state # 形状: (batch, sequence_length, hidden_size) for t in range(num_frames): if t >= hidden_states.shape[0]: feature = torch.zeros(self.model.config.hidden_size, device=self.device) else: # {{ edit_17 }} 对 sequence_length 维度取均值以获得 1D 特征向量 feature = hidden_states[t].mean(dim=0) # 形状: (hidden_size,) # 获取前两帧和后两帧的特征 feature_next1 = hidden_states[t + 1].mean(dim=0) if t + 1 < hidden_states.shape[0] else torch.zeros(self.model.config.hidden_size, device=self.device) feature_next2 = hidden_states[t + 2].mean(dim=0) if t + 2 < hidden_states.shape[0] else torch.zeros(self.model.config.hidden_size, device=self.device) caudio = torch.cat([prev_features[0], prev_features[1], feature, feature_next1, feature_next2], dim=-1) prev_features[0] = prev_features[1] prev_features[1] = feature yield caudio, feature
审查和理解代码
我正在仔细查看 data_loader.py 和 audio_alignment.py 模块的代码,确保这些实现没有不合理的地方或错误,并建议更优的实现方式。
评估代码
我正在逐步分析data_loader.py模块的Python代码,确保它符合OpenAI政策要求且有效。这个过程让我发现了一些可能的改进点。
导入第三方库
我正在导入各种第三方库,包括Stable Diffusion、Wav2Vec2和VQ-VAE等。接下来,我将详细介绍如何使用Mediapipe提取图像中的面部关键点,并介绍一个名为LoopyDataset的类。
加载和处理数据
在 getitem 中,我加载视频帧和音频,检查帧数,提取面部关键点,初始化音频对齐器,使用 VQ-VAE 编码且添加噪声,最后返回包含这些信息的样本字典。
检测代码错误
我正在审核数据加载器模块,发现一些潜在的错误。例如,'device'未定义但被请求,这可能会导致问题。
设定默认设备
在_audio_alignment.py中,AudioAligner类将默认设备设置为CPU,这样可以确保模型在设备上的兼容性和稳定性。
关注数据加载配置
在数据加载器的初始方法中,看到device
未定义但已使用。可以将device
添加为参数或从主代码传递。确保device
正确设置,避免问题。
检查代码稳定性
在data_loader.py
中的audio_loader
函数定义device
,确保load_audio
和load_video_frames
能正确处理视频和音频。设定'num_workers=0'以避免多进程问题。
定义device变量
我注意到data_loader.py中的__init__方法没有定义'device'变量。为了解决这个问题,我们可以在__init__方法中添加一个参数'name=device',或者使用'torch.device("cuda" if torch.cuda.is_available() else "cpu")'来定义'device'。
确保代码清晰
首先,我在data_loader.py中解决'用具'变量未定义问题; 然后,改进torchaudio.load以确保仅从视频文件加载音频; 最后,我剖析函数和方法,优化代码结构。
经过全面分析提供的data_loader.py
和audio_alignment.py
代码,我发现了一些实现上不合理或存在错误的地方。以下是对这些问题的详细分析,以及相应的最优实现建议。
data_loader.py
模块分析device
变量未定义在 LoopyDataset
的 __init__
方法中,有如下代码:
pythonself.device = torch.device(device if torch.cuda.is_available() else 'cpu')
这里的 device
变量未在方法参数中定义,导致在初始化时会出现 NameError
。应在 __init__
方法的参数列表中添加 device
,并提供默认值。例如:
pythondef __init__(self, video_paths: List[str], transform=None, audio_transform=None, frame_rate: int = 25, frame_size: Tuple[int, int] = (512, 512), audio_sample_rate: int = 16000, vqvae_config: Dict = None, device: str = 'cuda' if torch.cuda.is_available() else 'cpu'):
然后在方法内部使用传入的 device
:
pythonself.device = torch.device(device)
在 load_audio
方法中,使用了 torchaudio.load(video_path)
来加载音频:
pythonwaveform, sample_rate = torchaudio.load(video_path)
torchaudio.load
通常用于加载音频文件(如 .wav
),而不是视频文件。虽然某些版本的 torchaudio
支持从视频文件中提取音频,但兼容性和稳定性可能存在问题。为确保稳健性,建议使用 ffmpeg
或 moviepy
等库从视频中提取音频。
以下是使用 moviepy
从视频中提取音频的实现:
pythonfrom moviepy.editor import VideoFileClip def load_audio(self, video_path: str) -> torch.Tensor: video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio # 将音频采样率转换为所需的采样率 if audio_clip.fps != self.audio_sample_rate: audio_clip = audio_clip.set_fps(self.audio_sample_rate) audio_array = audio_clip.to_soundarray() # 如果是立体声,则转换为单声道 if audio_array.ndim > 1: audio_array = np.mean(audio_array, axis=1) waveform = torch.from_numpy(audio_array).unsqueeze(0) # 添加通道维度 return waveform
extract_keypoints_from_frames
方法的改进在该方法中,将帧从 Tensor 转换为 NumPy 数组,然后提取关键点。当前实现中,如果没有检测到关键点,可能会导致返回的 keypoints
长度不一致。建议在提取关键点时,添加对未检测到关键点情况的处理,确保返回的关键点数量一致。
修改后的代码:
pythondef extract_keypoints_from_frames(self, frames: torch.Tensor) -> torch.Tensor: keypoints = [] for frame in frames: frame_np = (frame.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8) kp = extract_facial_keypoints(frame_np) if kp.shape[0] != 68: kp = np.zeros((68, 2)) # 如果未检测到关键点,填充零 keypoints.append(kp) keypoints = np.array(keypoints) keypoints = torch.from_numpy(keypoints).float() return keypoints
__getitem__
方法中的错误在 __getitem__
方法中,尝试将 reference_frame
传递给 VQ-VAE 编码器进行编码:
pythoncref = self.vq_vae_encoder.encode(reference_frame.unsqueeze(0))
然而,reference_frame
已经是单帧数据,unsqueeze(0)
会在第一个维度添加一个维度,变成形状为 (1, 3, H, W)
。这可能是正确的。但需要确保 VQ-VAE 编码器的 encode
方法能够处理这样的输入。
此外,在编码 current_frames
和 motion_frames
时,需要检查它们的形状,确保与编码器的输入要求一致。
在使用 DataLoader
时,num_workers
设置为 0,以避免多进程可能带来的问题。然而,在主函数中设置了:
pythonimport torch.multiprocessing as torch_mp torch_mp.set_start_method('spawn', force=True)
这个设置可能会引起不必要的复杂性。既然已经将 num_workers
设置为 0,可以删除这两行代码,以简化程序。
device
。audio_alignment.py
模块分析在 AudioAligner
的 __init__
方法中,默认设备设置为 'cpu'
:
pythondef __init__(self, model_name: str = 'facebook/wav2vec2-base-960h', device: str = 'cpu'):
这可能导致在 GPU 可用时,未能利用 GPU 加速。建议修改为:
pythondef __init__(self, model_name: str = 'facebook/wav2vec2-base-960h', device: str = None): if device is None: device = 'cuda' if torch.cuda.is_available() else 'cpu' self.device = device
在 get_concatenated_audio_embeddings
方法中,存在对每一帧音频段进行特征提取的循环,这可能效率较低。可以考虑将所有音频段批量处理,提高计算效率。
修改后的代码:
pythondef get_concatenated_audio_embeddings( self, audio_waveform: np.ndarray, frame_rate: int = 25, sampling_rate: int = 16000, num_frames: int = None ) -> Tuple[torch.Tensor, torch.Tensor]: frame_duration = 1.0 / frame_rate samples_per_frame = int(sampling_rate * frame_duration) if num_frames is None: num_frames = int(np.ceil(len(audio_waveform) / samples_per_frame)) segments = [] for t in range(num_frames): start = t * samples_per_frame end = start + samples_per_frame segment = audio_waveform[start:end] if len(segment) < samples_per_frame: segment = np.pad(segment, (0, samples_per_frame - len(segment)), 'constant') segments.append(segment) segments_array = np.stack(segments) waveform_batch = torch.from_numpy(segments_array).to(self.device) with torch.no_grad(): inputs = self.feature_extractor(waveform_batch, sampling_rate=sampling_rate, return_tensors="pt", padding=True) input_values = inputs.input_values.to(self.device) outputs = self.model(input_values) hidden_states = outputs.last_hidden_state # (batch_size, sequence_length, hidden_size) # 后续处理...
在音频对齐过程中,需要确保处理的音频片段与视频帧数量一致。因此,需要对音频和视频长度进行严格的对齐和检查,避免因长度不匹配导致的错误。
在代码中多次使用了内存监控的代码,例如:
pythonimport psutil def log_memory_usage(step: str): process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"[Memory] 步骤: {step} - RSS: {mem_info.rss / (1024 ** 2):.2f} MB")
这些代码主要用于调试,在实际部署或训练时可以去除,以减少不必要的开销。
综合以上分析,以下是对代码的最优实现建议:
统一设备设置:在所有需要使用设备的地方,统一通过参数传递 device
,并在初始化时根据 torch.cuda.is_available()
自动选择。
音频和视频的加载:使用专门的库(如 moviepy
)从视频文件中提取音频,确保兼容性和稳健性。
批量处理:在可能的情况下,尽量使用批量处理(如批量提取音频特征),提高效率。
异常处理:在所有的文件操作和数据处理步骤中,添加异常处理,捕获可能的错误,确保程序的健壮性。
代码清理:移除调试代码和不必要的注释,保持代码的简洁和可读性。
添加注释和文档:为关键的函数和方法添加详细的注释和文档字符串,方便他人理解和维护。
以下是根据上述建议,修正后的部分代码示例。
data_loader.py
修正后的部分代码pythonclass LoopyDataset(Dataset): def __init__(self, video_paths: List[str], transform=None, audio_transform=None, frame_rate: int = 25, frame_size: Tuple[int, int] = (512, 512), audio_sample_rate: int = 16000, vqvae_config: Dict = None, device: str = 'cuda' if torch.cuda.is_available() else 'cpu'): self.video_paths = video_paths self.transform = transform self.audio_transform = audio_transform self.frame_rate = frame_rate self.frame_size = frame_size self.audio_sample_rate = audio_sample_rate self.video_length = 12 self.motion_frame_length = 124 self.device = torch.device(device) self.aligner = AudioAligner(device=self.device) self.vq_vae_encoder = self._init_vq_vae_encoder(vqvae_config).to(self.device) self.ddpm_scheduler = DDPMScheduler( num_train_timesteps=1000, beta_start=0.0001, beta_end=0.02, beta_schedule="linear", )
load_audio
方法修正pythonfrom moviepy.editor import VideoFileClip def load_audio(self, video_path: str) -> torch.Tensor: try: video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio if audio_clip.fps != self.audio_sample_rate: audio_clip = audio_clip.set_fps(self.audio_sample_rate) audio_array = audio_clip.to_soundarray() if audio_array.ndim > 1: audio_array = np.mean(audio_array, axis=1) waveform = torch.from_numpy(audio_array).unsqueeze(0).float() return waveform.to(self.device) except Exception as e: print(f"加载音频失败: {e}") return torch.zeros(1, int(self.audio_sample_rate * video_clip.duration)).to(self.device)
audio_alignment.py
修正后的部分代码pythonclass AudioAligner: def __init__(self, model_name: str = 'facebook/wav2vec2-base-960h', device: str = None): if device is None: device = 'cuda' if torch.cuda.is_available() else 'cpu' self.device = device self.model = Wav2Vec2Model.from_pretrained(model_name).to(self.device) self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)
通过对代码的分析和优化,我们修正了代码中存在的错误和不合理之处,提升了代码的健壮性和效率。主要的改进包括设备设置的统一、音频加载方式的改进、批量处理的应用以及异常处理的完善。在实际应用中,建议对整个代码进行全面的测试,以确保所有模块都能正常工作。