我现在使用anomalib库进行悬挂式磁浮列车障碍物检测,这是我写的论文,里面的结果还很好,但是论文...
Erstellt am: 8. Januar 2026
Beantwortet mit GPT-5.2 Pro von Chat01
Erstellt am: 8. Januar 2026
Beantwortet mit GPT-5.2 Pro von Chat01
我现在使用anomalib库进行悬挂式磁浮列车障碍物检测,这是我写的论文,里面的结果还很好,但是论文具体结果可以看第一张图片,我丰富数据集后结果如第二张图,但是我还是按照原始的论文的代码,配置文件如下:dataset:
name: mvtec
format: folder
path: /media/jd/4997BB1603CFE2C4/lw/liwei23/anomalib/dataset
normal_dir: normal # 自制数据集正样本子文件夹
abnormal_dir: abnormal0 # 自制数据集负样本子文件夹
mask_dir: label_mask0 # 二值掩膜路径,自制数据集一般没有,填null
category: bottle
normal_test_dir: normal_test0 # name of the folder containing normal test images.
task: segmentation
extensions: null
train_batch_size: 32
eval_batch_size: 32
num_workers: 8
image_size: 256 # dimensions to which images are resized (mandatory)
center_crop: null # dimensions to which images are center-cropped after resizing (optional)
normalization: imagenet # data distribution to which the images will be normalized: [none, imagenet]
transform_config:
train: null
eval: null
test_split_mode: from_dir # options: [from_dir, synthetic]
test_split_ratio: 0.2 # fraction of train images held out testing (usage depends on test_split_mode)
val_split_mode: same_as_test # options: [same_as_test, from_test, synthetic]
val_split_ratio: 0.5 # fraction of train/test images held out for validation (usage depends on val_split_mode)
tiling:
apply: false
tile_size: null
stride: null
remove_border_count: 0
use_random_tiling: False
random_tile_count: 16
model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 9 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: pixel_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
metrics:
image:
- F1Score
- AUROC
pixel:
- F1Score
- AUROC
- AUPRO
threshold:
method: adaptive #options: [adaptive, manual]
manual_image: null
manual_pixel: null
visualization:
show_images: False # show images on the screen
save_images: True # save images to the file system
log_images: True # log images to the available loggers (if any)
image_save_path: ./results1/1217fastflow/fastflow/mvtec/run/images # path to which images will be saved
mode: simple # options: ["full", "simple"]
project:
seed: 42
path: ./results1/1217fastflow
logging:
logger: [tensorboard,csv]
log_graph: true
optimization:
export_mode: null # options: torch, onnx, openvino
trainer:
enable_checkpointing: true
default_root_dir: null
gradient_clip_val: 0
gradient_clip_algorithm: norm
num_nodes: 1
devices: 1
enable_progress_bar: true
overfit_batches: 0.0
track_grad_norm: -1
check_val_every_n_epoch: 1 # Don't validate before extracting features.
fast_dev_run: false
accumulate_grad_batches: 1
max_epochs: 500
min_epochs: null
max_steps: -1
min_steps: null
max_time: null
limit_train_batches: 1.0
limit_val_batches: 1.0
limit_test_batches: 1.0
limit_predict_batches: 1.0
val_check_interval: 1.0 # Don't validate before extracting features.
log_every_n_steps: 50
accelerator: auto # <"cpu", "gpu", "tpu", "ipu", "hpu", "auto">
strategy: null
sync_batchnorm: false
precision: 32
enable_model_summary: true
num_sanity_val_steps: 0
profiler: null
benchmark: false
deterministic: false
reload_dataloaders_every_n_epochs: 0
auto_lr_find: false
replace_sampler_ddp: true
detect_anomaly: false
auto_scale_batch_size: false
plugins: null
move_metrics_to_cpu: false
multiple_trainloader_mode: max_size_cycle
异常图anomaly_map.py代码:"""FastFlow Anomaly Map Generator Implementation."""
from future import annotations
import torch
import torch.nn.functional as F
from omegaconf import ListConfig
from torch import Tensor, nn
class AnomalyMapGenerator(nn.Module):
"""Generate Anomaly Heatmap."""
textdef __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: """Generate Anomaly Heatmap. This implementation generates the heatmap based on the flow maps computed from the normalizing flow (NF) FastFlow blocks. Each block yields a flow map, which overall is stacked and averaged to an anomaly map. Args: hidden_variables (list[Tensor]): List of hidden variables from each NF FastFlow block. Returns: Tensor: Anomaly Map. """ flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: log_prob = -torch.mean(hidden_variable**2, dim=1, keepdim=True) * 0.5 prob = torch.exp(log_prob) flow_map = F.interpolate( input=-prob, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # anomaly_map = torch.mean(flow_maps, dim=-1) # 1. 平均值聚合 (作为第一步) anomaly_map_averaged = torch.mean(flow_maps, dim=-1) # 得到平均值聚合的异常图 # 2. 空间邻域最大池化 (在平均值聚合的异常图上进行) kernel_size = 3 # 可以尝试不同的 kernel_size,例如 3x3, 5x5 padding = (kernel_size - 1) // 2 # 保持输出尺寸与输入尺寸相同 ("same" padding) anomaly_map = F.max_pool2d( anomaly_map_averaged, kernel_size=kernel_size, stride=1, # 步长通常设置为 1,进行密集池化 padding=padding # 使用 padding 保持尺寸 ) return anomaly_map
torch_model.py的内容如下:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # 直接拼接:通道对齐 fused_feature = features[i] + aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) weighted_feats = sum(aligned_features) C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
但是同样地直接运行代码后,原始论文时step为2效果最好,但是我现在使用2时结果如下:59.8,59.32,54.64,93.2,37.84,我现在使用step为8时结果好点:61.47,58.89,46.69,88.86,38.55,怎么办呢,我按照这个lightning_model.py和loss.py的代码都保持之前的原始内容,那么我现在怎么办呢,虽然“论文写了自学习 α”,但是我发现只有9/1时效果最好,所以我就按9/1固定学习参数这个不需要改,我论文结构的都已经实现了,你不需要复现我的论文的功能额,因为我论文是复杂版本,代码实现的最简单的省略版本。我就是按这个代码跑的论文结果,新数据集里原始的fastflow的结果如下:65.42,60.61,38.58,82.26,10.7,padim的结果如下:64.71,61.14,52.89,91.35,8.53,cfa的结果如下:70.63,64.06,39.72,89.72,16.45,dsr的结果如下:35.75,59.98,30.72,78.29,7.81,csflow的结果如下:67.8,61.6,22.04,82.03,11.77,cflow的结果如下:36.81,57.2,37.06,86.98,19.82,不要根据我的论文去改代码,论文的代码就是现在我上传的,所以你只是要告诉我如何调整代码,我的论文对代码进行了适当学术化修改,所以不要复现我的论文,你需要根据现在的问题去修改,去改进,首先调整这个kernel_size = 7对结果没有增益,相反,效果会变差,不管是5,7,都会让效果变差,当我的代码变为下面的版本时:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 # 原来可能是 nn.Parameter(torch.zeros(3)) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W # self.global_flow_block = create_fast_flow_block( # input_dimensions=[fused_ch_global, H_global, W_global], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) local_steps = flow_steps # 12 global_steps = max(16, flow_steps) # 16 起步,必要时可试 20 self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) # weighted_feats = sum(aligned_features) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
设置如下:只改 gate 初始化为 -3.5(其它不变)结果如下:59.43,58.27,41.21,87.23,36.65,gate=-3.5 + global_steps=16的结果如下:61.72,58.90,50.81,89.77,35.47,在 2 的基础上扫 w_global=0.2,结果如下:61.77,59.02,50.51,89.67,35.50,在 2 的基础上扫 w_global=0.3,结果如下:61.72,58.85,50.81,89.77,35.47
global_steps=20 + w_global=0.1,结果如下:60.39,59.27,46.78,89.55,39.48,请根据上述内哦让那个,去修正我的代码,让他效果更好,记住论文只是让你大致了解思路,你只要看算法结果,根据上述内容据徐修改代码
你这组现象其实很“典型”:数据集一扩充(尤其是正常样本分布更宽、场景更杂、光照/背景变化更大),原来在小数据集上“刚好够用”的流容量(n/steps=2)就会明显欠拟合,所以你现在看到 step=2 整体掉、step=8 反而更好是合理的;你后面把 global_steps 拉到 16/20后 AUPRO/F1 又明显上来,也进一步印证了“容量不足”是主因,而不是 kernel_size。你论文里把 n=2 当成统一设置,本质是当时实验/算力折中,并不是对新数据集也必然最优(你论文的实验设置里也写了统一设置 n=2、lr=0.001 等)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
下面我不复现你的论文,只基于你现在最有增益的改动方向(gate 初始化、global_steps、w_global),给你一套“更可能继续涨分”的代码级修正,重点解决三个你当前版本里最容易拖后腿的问题:
在你的 forward 里这一段:
pythonst = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t
改成(关键:tanh+clamp,再做一次 GroupNorm 稳定分布):
python# 在 __init__ 里加 self.pre_affine_clamp = 2.0 self.global_norm = nn.GroupNorm(1, fused_ch_global) # fused_ch_global=96 # forward 里改 st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) # 关键:限制 s 的范围,避免 exp 爆炸 s = torch.tanh(s) * self.pre_affine_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # 关键:global path 再做一次稳定归一化 conditioned_feature = self.global_norm(conditioned_feature)
为什么这一步很可能直接涨分:
你现在 global_steps=16/20 时,global 分支容量更强,但同时也更容易学出极端 s,导致 anomaly map 在正常图上也出现尖峰(image AUROC/像素 FPR 变差)。clamp 后通常会:
你 Local loop 里算了 fused_feature,但 Global Path 仍在用 features。
把 local loop 改成同时保存 fused_features:
pythonfused_features = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) ...
然后 Global Path 这段:
pythontarget_size = features[-1].shape[2:] for i, feat in enumerate(features): ...
改成:
pythontarget_size = fused_features[-1].shape[2:] for i, feat in enumerate(fused_features): ...
为什么这步重要:
你最终融合 anomaly_map 是 local/global 的线性组合。两条路如果输入分布不一致,global map 经常会“补错方向”,表现为:pixel 指标上去但 image AUROC 掉,或者 AUPRO/F1 互相拉扯。
你现在:
pythongate = torch.sigmoid(self.aux_gates[i]) # 标量 fused_feature = features[i] + gate * aux_feat_aligned
建议改成:标量 gate × 门控卷积 gate_map(空间/通道自适应)× 归一化后的 aux 特征。
python# aux 对齐后做稳定归一化(比 BN 更稳,不依赖 batch 统计) self.aux_norms = nn.ModuleList([nn.GroupNorm(1, ch) for ch in self.channels]) # 空间/通道自适应 gate:输入 concat([feat, aux]) -> 输出 gate_map in [0,1] self.aux_gate_convs = nn.ModuleList() for ch in self.channels: gate_conv = nn.Conv2d(ch * 2, ch, kernel_size=1, bias=True) nn.init.zeros_(gate_conv.weight) nn.init.constant_(gate_conv.bias, -3.5) # 初始 sigmoid≈0.029,和你当前 gate=-3.5 的思路一致 self.aux_gate_convs.append(nn.Sequential(gate_conv, nn.Sigmoid()))
pythonaux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) aux_feat_aligned = self.aux_norms[i](aux_feat_aligned) gate_scalar = torch.sigmoid(self.aux_gates[i]) # 你验证过 -3.5 很有效,保留 gate_map = self.aux_gate_convs[i](torch.cat([features[i], aux_feat_aligned], dim=1)) fused_feature = features[i] + gate_scalar * gate_map * aux_feat_aligned
这步为什么会比“只有标量 gate”更可能涨分:
你现在 global 的加权是:
pythonw = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2]
这个是全局常数权重,对你新数据集(场景变化更大)不够灵活。
你其实已经写了 channel_weighting_module,但你定义时 concatenated_channel = fused_ch_global,如果你想用“拼接后再算权重”,那输入通道应该是 96*3=288。
把:
pythonconcatenated_channel = fused_ch_global self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) )
改成(输出 logits,不直接 softmax,方便叠加 global_scale_logits 做先验):
pythonconcatenated_channel = fused_ch_global * len(channels) # 96*3 self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(inplace=True), nn.Conv2d(concatenated_channel // 4, len(channels), 1), # logits )
pythonconcat_feats = torch.cat(aligned_features, dim=1) # [B, 288, H, W] logits_dyn = self.channel_weighting_module(concat_feats) # [B, 3, 1, 1] logits = logits_dyn + self.global_scale_logits.view(1, -1, 1, 1) w = torch.softmax(logits, dim=1) # [B,3,1,1] weighted_feats = w[:, 0:1] * aligned_features[0] + w[:, 1:2] * aligned_features[1] + w[:, 2:3] * aligned_features[2]
预期收益:
你现在 anomaly_map 走的是:
log_prob = -0.5 * mean(h^2)prob = exp(log_prob)flow_map = -prob这个对“异常强度跨度很大”的新数据集会有一个问题:exp 会让强异常很快饱和到接近 0,导致不同异常的对比度被压扁,AUPRO/F1 很容易受影响。
更稳的做法是直接用 能量项(近似 -log p):0.5*mean(h^2)。这也和你论文里“负对数似然与 h 的 L2 范数相关”的描述一致(你文中损失也写的是范数项)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
把 forward 改成下面这个版本(保留 kernel=3,且加一个轻微 avg_pool 再 max_pool,通常能让边界更稳定):
pythondef forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: # 用能量(≈ -log p)替代 exp(prob),避免饱和 score = 0.5 * torch.mean(hidden_variable ** 2, dim=1, keepdim=True) # [B,1,h,w] flow_map = F.interpolate( input=score, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] # 可选:对多尺度做“更偏向高分辨率”的加权(对小异物一般更友好) n = flow_maps.shape[-1] if n > 1: weights = torch.tensor([1.0 / (2 ** i) for i in range(n)], device=flow_maps.device, dtype=flow_maps.dtype) weights = weights / weights.sum() anomaly_map_averaged = (flow_maps * weights.view(1, 1, 1, 1, n)).sum(dim=-1) else: anomaly_map_averaged = flow_maps.squeeze(-1) # 先轻微平滑,再 max_pool,通常比直接 max_pool 更稳 anomaly_map_averaged = F.avg_pool2d(anomaly_map_averaged, kernel_size=3, stride=1, padding=1) anomaly_map = F.max_pool2d( anomaly_map_averaged, kernel_size=3, stride=1, padding=1 ) return anomaly_map
你已经验证过:
所以我建议你下一轮先别再用 step=2 做主配置了,直接用:
这套改动的逻辑是:在不改变 lightning_model/loss 的情况下,减少“正常图爆点”、提升“小异物对比度”、让 global 分支更稳定且与 local 同分布。按你目前的指标表现,最可能带来的提升方向是:
你现在 pixel 指标已经很强了,但 image AUROC 偏低,一个常见原因是:image score 用 max(anomaly_map) 太敏感(一个噪声尖峰就把整张正常图判异常)。
如果 anomalib 的 image score 是 max(FastFlow/Padim 经常是这样),你可以把 image score 改成“Top-k 均值 / 分位数”,只影响 image AUROC / image F1,通常不会伤害像素定位,反而能减少误报。
思路(伪代码):
python# anomaly_map: [B,1,H,W] flat = anomaly_map.view(B, -1) score = torch.quantile(flat, 0.995, dim=1) # 或 topk mean
这个改动一般对铁路这种“背景纹理强、偶发亮点多”的数据集很有用。
如果你按我上面这套改完,下一轮把结果(同样 5 个指标)贴出来,我可以再基于你新一轮的“涨/跌项”给你做更精确的下一步(比如:global_steps=20 下 AUPRO 仍低,就优先调 clamp/scale-weight;如果 pixel F1 上不去,就把 local 三尺度权重再偏向 stage1)。
target_size = fused_features[-1].shape[2:]
for i, feat in enumerate(fused_features): ...你写错了吧,应该是target_size = fused_feature[-1].shape[2:]
for i, feat in enumerate(fused_feature):
你说得对:我那段变量名的含义如果不讲清楚,确实容易让人以为写错了。
fused_feature:单个尺度融合后的特征张量(Tensor),形状一般是 [B, C, H, W]fused_features:把三个尺度的 fused_feature 收集起来的列表(list[Tensor]),长度=3所以我建议的写法里,target_size 和 for enumerate(...) 都必须基于列表 fused_features,不能用单个 fused_feature。你写的:
pythontarget_size = fused_feature[-1].shape[2:] for i, feat in enumerate(fused_feature):
这里会有两个问题:
fused_feature[-1] 取的是 batch 的最后一张图(或最后一个维度的切片),不是“最后一个尺度”。enumerate(fused_feature) 会沿着 tensor 的第 0 维(batch)迭代,完全不是你想要的“按尺度迭代”。在 local path 里先收集 3 个尺度的融合特征:
pythonfused_features = [] for i in range(len(features)): ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) ...
然后 global path 用这个列表做对齐/下采样:
pythontarget_size = fused_features[-1].shape[-2:] # 最小尺度(H,W) aligned_features = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat)
这也符合你文稿里“各尺度特征先统一通道、下采样到最小尺度,再融合/加权”的流程描述。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
fused_feature 这个名字也可以,但必须让它代表“列表”,比如改名成:
pythonfused_feature = [] # 注意:这里 fused_feature 变成 list 了 ... fused_feature.append(tensor) ... target_size = fused_feature[-1].shape[-2:] for i, feat in enumerate(fused_feature): ...
本质还是:要么 fused_features 是 list,要么你把 fused_feature 也当 list 用,不能“名字像 list、实际是 tensor”。
如果你把你当前代码里 local loop 那几行(你实际定义 fused 的地方)贴出来,我可以按你现有变量名直接给你一段“可复制粘贴”的最小 diff,避免你改动太多地方。
请给出完整的修改后的torch_model.py的内容,现在换来换去我已经把代码改乱了
请给出完整的修改后的torch_model.py的内容,现在换来换去我已经把代码改乱了
下面给你一份**“可直接整文件覆盖”**的 torch_model.py(在你现在这版基础上,把你提到的 gate=-3.5、global_steps 更大、global 权重融合 等思路都整理成一个干净、稳定、不容易炸数值的版本)。
它做了这些关键修正(都已经写进代码里):
global_steps 默认取 max(20, flow_steps)(你可以自己改)这个实现也更贴近你文稿里“门控卷积动态权重 + 通道加权 + 仿射变换”的文字描述(例如门控卷积、通道加权、s/b 等)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf) (秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
python"""FastFlow Torch Model Implementation. Original Code Copyright (c) 2022 @gathierry https://github.com/gathierry/FastFlow/. SPDX-License-Identifier: Apache-2.0 Modified Copyright (C) 2022 Intel Corporation SPDX-License-Identifier: Apache-2.0 """ from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: padding="same" breaks onnx export so manual padding required. padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(inplace=True), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: List[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow + (your simplified) Dual-Path Fusion variant.""" def __init__( self, input_size: Tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # ====== 你现在固定的融合权重(9/1) ====== self.w_global = 0.1 # anomaly_map = (1-w)*local + w*global # ====== 全局路径的“自写仿射”稳定化:限制 exp(s) ====== self.pre_affine_clamp = 2.0 # ====== Backbone ====== if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = None elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) # 你原来手写的 3-stage 设定 channels = [96, 192, 384] scales = [4, 8, 16] self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. " f"Supported: [cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2, swinv2_*_256]." ) self.channels = channels self.scales = scales self.num_stages = len(channels) for p in self.feature_extractor.parameters(): p.requires_grad = False # ====== FastFlow blocks(每个尺度一个) ====== self.fast_flow_blocks = nn.ModuleList() for ch, sc in zip(self.channels, self.scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ch, int(input_size[0] / sc), int(input_size[1] / sc)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ====== 只有 3-stage(例如 SwinV2)才启用你的 Dual-Path ====== self.use_dual_path = (self.num_stages == 3) if self.use_dual_path: # --- Aux 分支(ResNet18)--- self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], kernel_size=1) for i in range(3)] ) # 标量 gate(你验证过 -3.5 好) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 # aux 归一化(更稳) self.aux_norms = nn.ModuleList([nn.GroupNorm(1, ch) for ch in self.channels]) # gate_map:门控卷积(空间/通道自适应) self.aux_gate_convs = nn.ModuleList() for ch in self.channels: gate_conv = nn.Conv2d(ch * 2, ch, kernel_size=1, bias=True) nn.init.zeros_(gate_conv.weight) nn.init.constant_(gate_conv.bias, -3.5) # 初始 gate_map 很小 self.aux_gate_convs.append(nn.Sequential(gate_conv, nn.Sigmoid())) # --- Global Path:对齐到 96 通道 --- fused_ch_global = 96 self.fused_ch_global = fused_ch_global self.align_convs_global = nn.ModuleList( [nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True) for ch in self.channels] ) # 动态尺度权重 + 先验权重 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # 先验 concatenated_channel = fused_ch_global * 3 # 96*3 self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, kernel_size=1), nn.ReLU(inplace=True), nn.Conv2d(concatenated_channel // 4, 3, kernel_size=1), # logits ) # 你自写仿射子网:A(48ch)->输出 96ch -> chunk 得到 s,t(各48ch) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) # 归一化,稳住 global conditioned_feature self.global_norm = nn.GroupNorm(1, fused_ch_global) # global flow_steps:默认更大(你扫出来 16/20 更有效) global_steps = max(20, flow_steps) H_global = input_size[0] // self.scales[-1] W_global = input_size[1] // self.scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Return anomaly map (eval) or (hidden_variables, log_jacobians) (train).""" self.feature_extractor.eval() # ====== Backbone features ====== if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # ====== 原始 FastFlow(非 3-stage 时直接走原版) ====== if not self.use_dual_path: hidden_variables: List[Tensor] = [] log_jacobians: List[Tensor] = [] for block, feat in zip(self.fast_flow_blocks, features): hv, lj = block(feat) hidden_variables.append(hv) log_jacobians.append(lj) if self.training: return (hidden_variables, log_jacobians) return self.anomaly_map_generator(hidden_variables) # ====== Dual-Path ====== self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # list length 3 # ---- Local Path:融合 aux + flow ---- fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(3): aux_aligned = self.resnet_align[i](aux_feats[i]) if aux_aligned.shape[-2:] != features[i].shape[-2:]: aux_aligned = F.interpolate(aux_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) aux_aligned = self.aux_norms[i](aux_aligned) gate_scalar = torch.sigmoid(self.aux_gates[i]) # 标量(全局) gate_map = self.aux_gate_convs[i](torch.cat([features[i], aux_aligned], dim=1)) # 空间/通道自适应 fused_feat = features[i] + gate_scalar * gate_map * aux_aligned fused_features.append(fused_feat) hv, lj = self.fast_flow_blocks[i](fused_feat) local_hidden_variables.append(hv) local_log_jacobians.append(lj) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ---- Global Path:用 fused_features(保持分布一致)---- target_size = fused_features[-1].shape[-2:] # 最小尺度(H,W) aligned_features: List[Tensor] = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat) # 动态尺度权重 + 先验 concat_feats = torch.cat(aligned_features, dim=1) # [B, 96*3, H, W] logits_dyn = self.channel_weighting_module(concat_feats) # [B, 3, 1, 1] logits = logits_dyn + self.global_scale_logits.view(1, 3, 1, 1) w = torch.softmax(logits, dim=1) # [B,3,1,1] weighted_feats = ( w[:, 0:1] * aligned_features[0] + w[:, 1:2] * aligned_features[1] + w[:, 2:3] * aligned_features[2] ) # [B,96,H,W] C = weighted_feats.size(1) # 96 A, B = torch.split(weighted_feats, C // 2, dim=1) # 48,48 st = self.subnet(A) # [B,96,H,W] s, t = torch.chunk(st, 2, dim=1) # 48,48 # 关键:限制 s,避免 exp(s) 炸 s = torch.tanh(s) * self.pre_affine_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # [B,96,H,W] conditioned_feature = self.global_norm(conditioned_feature) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ---- 融合 anomaly maps(你固定 9/1)---- anomaly_map = (1.0 - self.w_global) * local_anomaly_map + self.w_global * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] if self.training: return (hidden_variables, log_jacobians) return anomaly_map def _get_cnn_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor(input_tensor) if self.norms is not None: features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError( f"Cannot reshape [B,L,C]=[{B},{L},{C}] to [B,C,H,W]=[{B},{C},{H},{W}] because H*W != L." ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
flow_steps=8~9(你已经验证 8 比 2 好)w_global=0.1(文件里默认 0.1)global_steps:这份代码默认 max(20, flow_steps),也就是你想要的 20 起步如果你跑完把新结果(那 5 个数)贴出来,我可以再根据“哪一项涨/跌”帮你进一步做有方向的下一步微改(比如只动 global_steps、只动 pre_affine_clamp、或只动 gate 初始化/尺度权重策略)。
按照这个跑完代码地结果很差,下一步怎么改进呢,具体如下:56.86,59.19,40.04,85.71,28.93,
你这组 56.86,59.19,40.04,85.71,28.93 变差,最主要原因基本可以锁定:我上一版在 torch_model.py 里引入了太多“可学习融合模块”(gate_map、动态尺度权重、额外Norm、tanh-clamp),在你 lightning_model.py / loss.py 不变、且训练只用 normal 的 NLL 目标下,这些模块很容易学到“让分布更好拟合 normal、但把可分性/定位能力弄没”的方向,导致总体指标掉下去。
下一步建议:先回到你已经验证过最稳、最接近最优的那条线:
aux_gates=-3.5(标量 gate) + global_scale_logits(三尺度静态softmax权重) + global_steps>=16 + w_global=0.1~0.2下面给你一份**“干净版、可直接覆盖整文件”** torch_model.py(就是按你之前测出来较好的那种结构整理的;并且把你指出的 target_size = fused_features[-1].shape[2:] 写正确了)。
python"""FastFlow Torch Model Implementation.""" from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(inplace=True), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: List[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow + (your simplified) dual-path fusion version.""" def __init__( self, input_size: Tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # ====== 你验证过的:固定融合权重(默认给你 0.2,可自己改回 0.1)====== # anomaly_map = (1-w_global)*local + w_global*global self.w_global = 0.2 # ====== global 的 flow steps(你验证过 16/20 比较有效)====== self.global_steps = max(16, flow_steps) if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = None elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [96, 192, 384] scales = [4, 8, 16] self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError(f"Unsupported backbone: {backbone}") self.channels = channels self.scales = scales for p in self.feature_extractor.parameters(): p.requires_grad = False # ====== Aux extractor (ResNet18) ====== self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False # resnet18 channels for out_indices [1,2,3] resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], kernel_size=1) for i in range(3)] ) # ====== 你验证过的 gate 初始化 ====== self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 # ====== Local flow blocks ====== self.fast_flow_blocks = nn.ModuleList() for ch, sc in zip(self.channels, self.scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ch, int(input_size[0] / sc), int(input_size[1] / sc)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ====== Global path: align to 96ch ====== self.fused_ch_global = 96 self.align_convs_global = nn.ModuleList( [nn.Conv2d(ch, self.fused_ch_global, kernel_size=1, bias=True) for ch in self.channels] ) # 3-scale static weights (trainable) self.global_scale_logits = nn.Parameter(torch.zeros(3)) H_global = input_size[0] // self.scales[-1] W_global = input_size[1] // self.scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[self.fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=self.global_steps, ) # global affine subnet (A->(s,t)) self.subnet = nn.Sequential( nn.Conv2d(self.fused_ch_global // 2, self.fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(self.fused_ch_global // 2, self.fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Train: (hidden_variables, log_jacobians); Eval: anomaly_map.""" self.feature_extractor.eval() # backbone features if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # aux features self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [3] # ====== Local path ====== fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(3): aux_aligned = self.resnet_align[i](aux_feats[i]) if aux_aligned.shape[-2:] != features[i].shape[-2:]: aux_aligned = F.interpolate(aux_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # scalar fused_feat = features[i] + gate * aux_aligned fused_features.append(fused_feat) hv, lj = self.fast_flow_blocks[i](fused_feat) local_hidden_variables.append(hv) local_log_jacobians.append(lj) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====== Global path (use fused_features!) ====== target_size = fused_features[-1].shape[2:] # ✅ 你指出的点:必须是 list[-1] aligned_features: List[Tensor] = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat) w = torch.softmax(self.global_scale_logits, dim=0) # [3] weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) # 96 A, B = torch.split(weighted_feats, C // 2, dim=1) # 48, 48 st = self.subnet(A) # 96 s, t = torch.chunk(st, 2, dim=1) # 不做 tanh-clamp(你之前更好的版本也没做) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # 96 global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====== fuse anomaly map ====== anomaly_map = (1.0 - self.w_global) * local_anomaly_map + self.w_global * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] if self.training: return (hidden_variables, log_jacobians) return anomaly_map def _get_cnn_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor(input_tensor) if self.norms is not None: features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError(f"Cannot reshape [B,L,C]=[{B},{L},{C}] to [B,C,H,W] because H*W != L.") feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
你先用上面文件跑一遍,目标是至少回到你之前那档 61.x / 50.x / 89.x。在此基础上再做下面这些“小步快跑”:
self.w_global:0.1 / 0.15 / 0.2
self.global_steps:16 / 20
w_global 一起扫推荐组合顺序:
(global_steps=16, w=0.15) → (16, 0.2) → (20, 0.1) → (20, 0.15)
你现在 conv3x3_only: True,建议你试一次 False(它会让 flow block 交替出现 1×1,通常对复杂背景/域迁移更友好)——这一步往往比你在 forward 里加一堆融合层更有效。
再加这一行“保险”(只在你出现 NaN/inf 或者训练特别抖时用):
pythons = torch.clamp(s, -5, 5) B_transformed = torch.exp(s) * B + t
这属于稳定性补丁,不是为了变强。
把你用上面“干净版”跑出来的新 5 个数贴我(以及你此时 w_global/global_steps/flow_steps/conv3x3_only 的具体值),我就能根据“到底是哪一项掉了”给你下一次只改 1~2 行的方向(比如:global 用 fused 的同时,local 是否需要改成 features + gate*(aux - features) 这种更像残差校正的形式)。
我按照下面的代码跑完的结果如下,结果目前是最好,但是对比其他算法还是不够,下一步怎么改进呢:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) # weighted_feats = sum(aligned_features) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
结果如下: image_AUROC 0.6637790203094482
image_F1Score 0.6151961088180542
pixel_AUPRO 0.5146437287330627
pixel_AUROC 0.8934715986251831
pixel_F1Score 0.34470832347869873
下一步怎么改进呢
下面给你一份**“可直接替换”的完整 torch_model.py(基于你当前“最好结果”的版本做两类关键改动**,都是围绕你现在的瓶颈:** pixel_F1 偏低 / AUPRO 上不去**):
target_size = fused_features[-1].shape[2:],并且 global path 的循环也用 fused_features)s = tanh(s) * 2.0(类似于 flow 里 affine_clamping 的思想),通常会让训练稳定很多,pixel 指标更容易上去。python"""FastFlow Torch Model Implementation. Original Code Copyright (c) 2022 @gathierry https://github.com/gathierry/FastFlow/. SPDX-License-Identifier: Apache-2.0 Modified Copyright (C) 2022 Intel Corporation SPDX-License-Identifier: Apache-2.0 """ from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: list[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): kernel_size = 1 if (i % 2 == 1 and not conv3x3_only) else 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow.""" def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # 这两个参数你目前版本里已经在用:会参与训练(影响 flow 输入) self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 # ---------------- Backbone ---------------- if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = nn.ModuleList() # placeholder elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [96, 192, 384] scales = [4, 8, 16] self.channels = channels self.scales = scales # 这里我建议 elementwise_affine=True(可训练),更容易适应你“扩充后数据集”的分布漂移 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. " "Available: [cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2, swinv2_*]" ) # 冻结 backbone for p in self.feature_extractor.parameters(): p.requires_grad = False # ---------------- Aux branch (ResNet18) ---------------- self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList([nn.Conv2d(resnet_channels[i], channels[i], 1) for i in range(3)]) # ✅ 新增:对齐后的 aux 特征做一个轻量归一化,稳定融合(会参与训练) self.aux_post_norms = nn.ModuleList([nn.GroupNorm(1, channels[i]) for i in range(3)]) # ---------------- Local flow blocks ---------------- self.fast_flow_blocks = nn.ModuleList() for i in range(3): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ channels[i], int(input_size[0] / scales[i]), int(input_size[1] / scales[i]), ], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ---------------- Global path ---------------- fused_ch_global = 96 self.align_convs_global = nn.ModuleList([nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True) for ch in channels]) self.global_post_norms = nn.ModuleList([nn.GroupNorm(1, fused_ch_global) for _ in channels]) H_global = input_size[0] // scales[-1] W_global = input_size[1] // scales[-1] # ✅ 建议:global 分支 steps 稍微深一点更稳(你之前扫过 16/20 有时对 pixel 有利) global_steps = max(16, flow_steps) self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) # 手写 coupling 的 subnet(保持你原结构) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # tanh-clamp 的强度(可调:1.5~3.0),默认 2.0 通常比较稳 self.manual_coupling_clamp = 2.0 def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Forward pass.""" # -------------- backbone features -------------- self.feature_extractor.eval() if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # -------------- aux features -------------- self.aux_extractor.eval() with torch.no_grad(): aux_feats = self.aux_extractor(image) # -------------- Local Path:融合 + flow -------------- fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate( aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False, ) # ✅ 新增:aux aligned 后做归一化 aux_feat_aligned = self.aux_post_norms[i](aux_feat_aligned) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # -------------- Global Path(用 fused_features,不再用 features) -------------- aligned_features: List[Tensor] = [] target_size = fused_features[-1].shape[2:] # ✅ 关键:全局对齐用 fused_features 的空间尺寸 for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = self.global_post_norms[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) # ✅ 关键改进:手写 coupling 的 s 做 tanh-clamp,避免 exp 爆掉导致训练不稳/泛化差 s = torch.tanh(s) * self.manual_coupling_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # -------------- Combine anomaly maps(保持你验证过的 0.9/0.1) -------------- anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] return_val = (hidden_variables, log_jacobians) if not self.training: return_val = anomaly_map return return_val # ---------------- feature helpers ---------------- def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): features = self.feature_extractor(input_tensor) # norms 可能可训练,所以不放在 no_grad 里 features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feats = self.feature_extractor.forward_features(input_tensor) if not isinstance(feats, (list, tuple)): raise ValueError("Expected Swin forward_features to return a list/tuple of stage features.") feats = list(feats)[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(feats): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError( f"Cannot reshape [B, L, C]=[{B},{L},{C}] to [B,C,H,W]=[{B},{C},{H},{W}] because H*W!=L." ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
你目前是:
这说明问题不是“能不能找出来异常”,而是异常区域的边界/面积预测不够准(阈值+热图形态+小目标一致性)。
你现在 early_stopping 用的是 image_AUROC,它会逼着模型“判别一张图是否异常”,而不是“像素区域分得准”。
建议你先改 config:
early_stopping.metric: pixel_AUPRO(优先)pixel_F1Score很多时候这一步就能让 pixel_F1 明显涨一截(因为你训练过程选到的 checkpoint 完全不同)。
你之前扫过 global_steps=16/20,现代码我默认 global_steps=max(16, flow_steps)。
建议你做两次对比(只动 global_steps):
如果 pixel_F1 仍低,说明 global map 对像素边界帮助不大,后面就应该减弱 global 分支(比如 0.95/0.05)。
你现在是 swinv2_tiny_window16_256,window16 对小障碍物往往不友好。更“稳妥”的改法是:
swinv2_tiny_window8_256(通常对小目标更敏感)你现在 threshold 是 adaptive。pixel_F1 很可能是阈值没对上。
建议你做一个“上限实验”:
manual_pixel(比如 0.05~0.95)如果你愿意把你当前验证集上 anomaly_map 的几张可视化(正常/异常各几张 + mask)贴出来,我可以更快判断:
到底是 热图形态问题(需要改 map 生成/融合),还是 阈值/标注尺寸对齐问题(mask resize/crop)。这两种改法完全不同。
按照你的代码,我的配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window8_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
结果如下: image_AUROC 0.6359753608703613
image_F1Score 0.590243935585022
pixel_AUPRO 0.3344220221042633
pixel_AUROC 0.7628674507141113
pixel_F1Score 0.043394044041633606
配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]结果如下:image_AUROC 0.6401253938674927
image_F1Score 0.6085918545722961
pixel_AUPRO 0.41742947697639465
pixel_AUROC 0.8104718923568726
pixel_F1Score 0.03814755380153656
你这两组结果(window8 / window16)里 pixel_F1 都掉到 ~0.04,但 pixel_AUROC 还有 0.76~0.81——这几乎可以判定:** 主要不是“热图没信息”,而是“阈值/归一化/选 checkpoint 的目标”把像素二值化搞崩了**。也就是说:你现在的模型在像素排序上还行,但被阈值策略/早停策略选到的权重,导致预测 mask 几乎全空或全噪声,从而 F1 接近 0。
下面给你一套下一步最稳、最可能立刻涨分的改法(按优先级从高到低),每一步都尽量“只动一点点”。
你现在 early stopping 盯着 image_AUROC,很容易选到“图像判别不错但像素阈值很差”的 checkpoint,于是 pixel_F1 直接崩。
把配置改成:
yamlearly_stopping: patience: 10 metric: pixel_AUPRO mode: max
如果你只能选一个改动先试,就先做这个。很多项目里这一步就能让 pixel_F1 从 0.0x 回到 0.2~0.3+。
min_max 在异常检测里很容易把少量极端值拉伸/压扁,导致“阈值=0.5”这种默认策略完全不合适,像素 F1 就会非常难看。
把:
yamlnormalization_method: min_max
改成:
yamlnormalization_method: cdf
(或者先设成 null 做对照,看是否是归一化导致阈值崩。)
从你给的数看,window8 比 window16 整体更差,而且 pixel F1 同样崩。下一轮主线就用:
swinv2_tiny_window16_256window8 想对小目标友好,需要配合更细的后处理/阈值策略,不然噪声会更重。
你贴的“最好结果那版”其实结构最对路。下一步代码上我建议只做 两处改动,不会把结构改乱:
你 local 里算了 fused_feature = features[i] + gate * aux_feat_aligned,但 global 用的还是 features。把它改成用 fused_features,让两条分支输入分布一致,通常对 pixel AUPRO/F1 有帮助。
最小改法(只改 forward):
pythonfused_features = [] ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) ... # global: target_size = fused_features[-1].shape[2:] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat)
你 global coupling 里 exp(s) 不受控,训练一不稳就会出现“热图要么全平,要么全爆点”,像素阈值就会崩。
最保守、最不影响你原逻辑的补丁:
pythons = torch.clamp(s, -3.0, 3.0) B_transformed = torch.exp(s) * B + t
这一步通常会让 pixel 指标更稳定(尤其是 F1)。
你现在:
yamlconv3x3_only: True
建议改成:
yamlconv3x3_only: False
让 flow block 交替出现 1×1/3×3,很多场景下对复杂背景的泛化更好(尤其你数据集扩充后)。
这个不改训练,只验证问题根因:
如果“扫阈值后的最好 pixel_F1 明显 > 当前 0.04”,那就说明模型热图是好的,只是默认阈值策略不适配。后面你就应该把“阈值学习/选择”改成基于像素而不是图像。
用 window16 做主线,先把训练选 checkpoint 的目标改对:
yamlmodel: backbone: swinv2_tiny_window16_256 flow_steps: 12 hidden_ratio: 1.0 conv3x3_only: False lr: 0.001 weight_decay: 0.00001 early_stopping: patience: 10 metric: pixel_AUPRO mode: max normalization_method: cdf
再配合我上面 4.1 + 4.2 的两行代码修正(global 用 fused_features + clamp s)。
如果你按这套跑完,把新的 5 个指标贴我(尤其 pixel_AUPRO / pixel_F1),我下一步会根据“pixel_AUPRO 上去了但 F1 还低”还是“AUROC 上去了但 AUPRO 低”来给你只动一两处的精准改法(比如只调 global_steps / 只调融合权重 0.95/0.05 / 或只改 anomaly_map 的能量形式)。
我现在按照下面的代码:"""FastFlow Anomaly Map Generator Implementation."""
from future import annotations
import torch
import torch.nn.functional as F
from omegaconf import ListConfig
from torch import Tensor, nn
class AnomalyMapGenerator(nn.Module):
"""Generate Anomaly Heatmap."""
textdef __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: """Generate Anomaly Heatmap. This implementation generates the heatmap based on the flow maps computed from the normalizing flow (NF) FastFlow blocks. Each block yields a flow map, which overall is stacked and averaged to an anomaly map. Args: hidden_variables (list[Tensor]): List of hidden variables from each NF FastFlow block. Returns: Tensor: Anomaly Map. """ flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: log_prob = -0.5 * torch.mean(hidden_variable**2, dim=1, keepdim=True) prob = torch.exp(log_prob) flow_map = F.interpolate( input=-prob, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) anomaly_map = torch.mean(flow_maps, dim=-1) return anomaly_map
另一个文件如下:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
import torch.nn.functional as F
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.swin_transformer_v2 import SwinTransformerV2
from timm.models.vision_transformer import VisionTransformer
from torch import Tensor, nn
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
SWINV2_BACKBONES = (
"swinv2_small_window8_256",
"swinv2_small_window16_256",
"swinv2_tiny_window8_256",
"swinv2_tiny_window16_256",
"swinv2_base_window8_256",
"swinv2_base_window16_256",
)
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size self.use_aux_branch = False if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in SWINV2_BACKBONES: self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) channels = [96, 192, 384] scales = [4, 8, 16] self.use_aux_branch = True self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) self.channels = channels self.scales = scales for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.fast_flow_blocks = nn.ModuleList() for channel, scale in zip(channels, scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) if self.use_aux_branch: self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for param in self.aux_extractor.parameters(): param.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3)] ) self.aux_norms = nn.ModuleList([nn.GroupNorm(8, ch) for ch in self.channels]) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) concatenated_channel = fused_ch_global h_global = input_size[0] // scales[-1] w_global = input_size[1] // scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, h_global, w_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.global_scale_logits = nn.Parameter(torch.zeros(len(channels))) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1), ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: """Forward-Pass for FastFlow Model.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] if not self.use_aux_branch: hidden_variables: list[Tensor] = [] log_jacobians: list[Tensor] = [] for fast_flow_block, feature in zip(self.fast_flow_blocks, features): hidden_variable, log_jacobian = fast_flow_block(feature) hidden_variables.append(hidden_variable) log_jacobians.append(log_jacobian) return_val = (hidden_variables, log_jacobians) if not self.training: return_val = self.anomaly_map_generator(hidden_variables) return return_val self.aux_extractor.eval() aux_feats = self.aux_extractor(image) local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate( aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False, ) aux_feat_aligned = self.aux_norms[i](aux_feat_aligned) gate = torch.sigmoid(self.aux_gates[i]) fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = aligned_features[0] * w[0] for i in range(1, len(aligned_features)): weighted_feats = weighted_feats + aligned_features[i] * w[i] c = weighted_feats.size(1) a, b = torch.split(weighted_feats, c // 2, dim=1) st = self.subnet(a) s, t = torch.chunk(st, 2, dim=1) b_transformed = torch.exp(s) * b + t conditioned_feature = torch.cat([a, b_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] return_val = (hidden_variables, log_jacobians) if not self.training: return_val = anomaly_map return return_val def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): b, l, c = feat.shape h = grid_size[0] // (2**i) w = grid_size[1] // (2**i) if h * w != l: raise ValueError( f"Cannot reshape [B, L, C]=[{b}, {l}, {c}] into [B, C, H, W]=[{b}, {c}, {h}, {w}]." ) feat = feat.permute(0, 2, 1).reshape(b, c, h, w) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
代码结果很好,输入是256,结果如下:运行后结果如下: image_AUROC 0.6637790203094482
image_F1Score 0.6151961088180542
pixel_AUPRO 0.513035237789154
pixel_AUROC 0.8913142681121826
pixel_F1Score 0.3465515673160553,还能怎么改进呢
你现在这版结果(image_AUROC≈0.664、pixel_AUPRO≈0.513、pixel_F1≈0.347)已经到了一个“热图有信息,但还没把对比度/阈值/边界形态吃满”的阶段。接下来最划算的提升,一般不在再堆新分支,而在:
下面给你按“改动小、收益大”的顺序,给出具体怎么改(含可直接替换的代码)。
exp(log_prob),改成能量项(强烈建议)你当前:
pythonlog_prob = -0.5 * mean(z^2) prob = exp(log_prob) flow_map = -prob
这会有个典型问题:exp 会很快饱和,导致异常强弱对比被压扁,像素阈值很难选,AUPRO/F1 往往上不去。
更稳的做法是直接用 能量(≈ -log p):
score = 0.5 * mean(z^2),它在异常区域会“线性更分得开”。
✅ 直接把你的 anomaly_map.py 替换成下面这个版本:
python"""FastFlow Anomaly Map Generator Implementation.""" from __future__ import annotations import torch import torch.nn.functional as F from omegaconf import ListConfig from torch import Tensor, nn class AnomalyMapGenerator(nn.Module): """Generate Anomaly Heatmap.""" def __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: # energy ~= -log p, avoids exp saturation score = 0.5 * torch.mean(hidden_variable ** 2, dim=1, keepdim=True) # [B,1,h,w] flow_map = F.interpolate( input=score, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] anomaly_map = torch.mean(flow_maps, dim=-1) # [B,1,H,W] # optional: a tiny smoothing often helps pixel-F1 (keep very mild) anomaly_map = F.avg_pool2d(anomaly_map, kernel_size=3, stride=1, padding=1) return anomaly_map
这一步经常会让 pixel_AUPRO / pixel_F1 直接更好(尤其是你这种“热图排序不错但阈值难”的情况)。
你现在 global 路径还在用 features 做 aligned_features:
pythontarget_size = features[-1].shape[2:] for i, feat in enumerate(features): ...
但 local 已经算了 fused_feature = features[i] + gate * aux_feat_aligned。global 和 local 输入分布不一致,融合时会互相拉扯,最伤 pixel 指标。
把 local 部分改成:
pythonfused_features = [] ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) ...
把 global 部分改成:
pythonaligned_features = [] target_size = fused_features[-1].shape[2:] # ✅ 用 fused_features for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat)
你现在:
pythonb_transformed = torch.exp(s) * b + t
改成(很保守的 clamp,不会改变太多表达能力,但能显著减少爆点):
pythons = torch.clamp(s, -3.0, 3.0) b_transformed = torch.exp(s) * b + t
你之前已经出现过:pixel_AUROC 不低但 pixel_F1 崩的情况,这通常就是 early stopping 选错 checkpoint。
建议你把:
yamlearly_stopping: metric: image_AUROC
改成:
yamlearly_stopping: metric: pixel_AUPRO mode: max
你现在要对比其他算法,通常比的是定位能力(AUPRO/F1),这一步很关键。
你现在固定融合 0.9/0.1。建议你只扫下面这 4 个组合(别大范围扫):
w_global = 0.05 和 0.10global_flow_steps = 16 和 20(global 单独设,不要跟 local 一样 12)组合:
经验上:pixel F1 低时,往往 global 权重需要更小(0.05~0.1),否则 global 的低分辨率响应会把边界“抹糊”。
你现在用 min_max,它对极端值很敏感,像素阈值会飘。
建议你对照跑一次:
yamlnormalization_method: cdf
你跑完后把新的 5 个指标贴我(尤其 pixel_AUPRO / pixel_F1),我再根据“AUPRO涨了但F1没涨”还是“F1涨了但image AUROC掉了”给你下一步只改 1~2 行的精准策略(比如是否要改融合方式、是否要做 top-quantile 的 image score 等)。
除了cdf替换没进行外,我的代码结果如下: image_AUROC 0.5986433029174805
image_F1Score 0.5886363387107849
pixel_AUPRO 0.4152340590953827
pixel_AUROC 0.8632453083992004
pixel_F1Score 0.3275681734085083,下一步怎么改进算法呢
你这次改完(energy anomaly map + fused_features + clamp(s))后,pixel_AUROC 还在 0.86,说明“热图仍然有区分度”,但 pixel_AUPRO / image_AUROC 明显掉,典型原因是:
-exp(log_prob) 更“长尾”),而你还在用 min_max 归一化 + 原来的阈值/早停策略 → 阈值和排序的“有效区间”被破坏;下面给你下一步最建议的改进路线(按“最可能涨分 / 改动最小”排序),每一步都很具体。
normalization_method 换成 cdf(你还没做,但这是这次掉分的最可能元凶)你现在的 energy map 很容易出现长尾,min_max 会被极值牵着走,导致大多数像素被挤到很窄范围里,AUPRO/F1 都会受伤。
直接改配置:
yamlnormalization_method: cdf
这一步非常可能把你这次掉下去的 image_AUROC / pixel_AUPRO 拉回来,甚至超过你 0.513 那次。
你已经多次出现“image 还行但 pixel 很飘”的现象。继续用 image_AUROC 早停,会反复选到像素不佳的 checkpoint。
改成:
yamlearly_stopping: patience: 10 metric: pixel_AUPRO mode: max
(如果你更看重最终 pixel_F1,也可以改成 pixel_F1Score。)
你现在 energy = 0.5*mean(z^2) 太“长尾”,AUPRO 往往吃亏。解决方式不是回退到 exp,而是:
log1p 压缩长尾(保留排序,但让阈值更好找)把你的 AnomalyMapGenerator 改成这个版本(只改一个文件):
pythonclass AnomalyMapGenerator(nn.Module): def __init__(self, input_size): super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps = [] for z in hidden_variables: # energy score = 0.5 * torch.mean(z ** 2, dim=1, keepdim=True) # [B,1,h,w] score = torch.log1p(score) # ✅ 压缩长尾 score = F.interpolate(score, size=self.input_size, mode="bilinear", align_corners=False) # ✅ 稳健归一化:按每张图的分位数缩放,避免被极端值牵着走 B = score.shape[0] flat = score.view(B, -1) q_low = torch.quantile(flat, 0.01, dim=1).view(B, 1, 1, 1) q_high = torch.quantile(flat, 0.99, dim=1).view(B, 1, 1, 1) score = (score - q_low) / (q_high - q_low + 1e-6) score = score.clamp(0.0, 1.0) flow_maps.append(score) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] anomaly_map = torch.mean(flow_maps, dim=-1) # [B,1,H,W] anomaly_map = F.avg_pool2d(anomaly_map, 3, 1, 1) # 轻微平滑 return anomaly_map
这招通常对 pixel_AUPRO 很友好,因为它直接改善“PR 曲线的可分性”,也会让 pixel_F1 更稳定。
你原来最好的像素结果是基于旧热图形态。现在换了 energy 类热图,global(低分辨率)更容易把边界抹糊,AUPRO/F1 掉是常见的。
下一轮只扫 4 个点就够了(别大范围扫):
w_global: 0.05 / 0.10global_steps: 16 / 20(global 独立设,别跟 local 一样 12)组合:
经验:当你发现 AUPRO/F1 不够,先把 global 权重减小,比加复杂模块有效。
如果你现在用的是 s = clamp(s, -3, 3),它可能把 global 的表达力砍太多,导致 image_AUROC/pixel_AUROC 都掉。
两种更稳的写法(二选一):
A. 放宽 clamp:
pythons = torch.clamp(s, -5.0, 5.0)
B. tanh-clamp(更平滑):
pythons = torch.tanh(s) * 2.5 # 2.0~3.0 都可以试
normalization_method: cdfearly_stopping.metric: pixel_AUPROw_global=0.05,global_steps=16(先跑这一点)跑完把 5 个指标贴我——我下一步会根据:
16.7 M Trainable params
36.0 M Non-trainable params
52.7 M Total params
210.842 Total estimated model params size (MB)
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/loggers/tensorboard.py:191: UserWarning: Could not log computational graph to TensorBoard: The model.example_input_array attribute is not set or input_array was not given.
rank_zero_warn(
Epoch 0: 0%| | 0/103 [00:00<?, ?it/s]/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/utilities/data.py:83: UserWarning: Trying to infer the batch_size from an ambiguous collection. The batch size we found is 32. To avoid any miscalculations, use self.log(..., batch_size=batch_size).
warning_cache.warn(
Epoch 0: 79%|██████████████████████████████████████████████████████████████████▊ | 81/103 [00:30<00:08, 2.62it/s, loss=-1.1e+04, v_num=44, train_loss_step=-3.06e+4]/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/utilities/data.py:83: UserWarning: Trying to infer the batch_size from an ambiguous collection. The batch size we found is 26. To avoid any miscalculations, use self.log(..., batch_size=batch_size).
warning_cache.warn(
Epoch 45: 100%|█| 103/103 [00:56<00:00, 1.83it/s, loss=2.01e+07, v_num=44, train_loss_step=1.98e+7, pixel_F1Score=0.422, pixel_AUROC=0.907, pixel_AUPRO=0.494, train_loss_epoch=2.13e+/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/torch/init.py:1777: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
assert condition, message
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/timm/models/swin_transformer_v2.py:121: TracerWarning: Converting a tensor to a Python integer might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
B = int(windows.shape[0] / (H * W / window_size[0] / window_size[1]))
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/models/fastflow/torch_model.py:702: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
if H * W != L:
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/models/fastflow/torch_model.py:372: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]:
Epoch 45: 100%|█| 103/103 [01:02<00:00, 1.65it/s, loss=2.01e+07, v_num=44, train_loss_step=1.98e+7, pixel_F1Score=0.422, pixel_AUROC=0.907, pixel_AUPRO=0.494, train_loss_epoch=2.13e+
2026-01-15 16:40:03,340 - anomalib.utils.callbacks.timer - INFO - Training took 2643.60 seconds
2026-01-15 16:40:03,366 - anomalib - INFO - Loading the best model weights.
2026-01-15 16:40:03,366 - anomalib - INFO - Testing the model.
2026-01-15 16:40:03,371 - pytorch_lightning.utilities.rank_zero - INFO - The following callbacks returned in LightningModule.configure_callbacks will override existing callbacks passed to Trainer: EarlyStopping
2026-01-15 16:40:03,372 - pytorch_lightning.utilities.rank_zero - INFO - You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set torch.set_float32_matmul_precision('medium' | 'high') which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
2026-01-15 16:40:03,372 - anomalib.utils.callbacks.model_loader - INFO - Loading the model from results1/1217fastflow/fastflow/mvtec/run/weights/lightning/model-v38.ckpt
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/utils/callbacks/model_loader.py:32: FutureWarning: You are using torch.load with weights_only=False (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for weights_only will be flipped to True. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via torch.serialization.add_safe_globals. We recommend you start setting weights_only=True for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
pl_module.load_state_dict(torch.load(self.weights_path, map_location=pl_module.device)["state_dict"])
2026-01-15 16:40:04,068 - pytorch_lightning.accelerators.cuda - INFO - LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
Testing DataLoader 0: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 21/21 [01:09<00:00, 3.30s/it]2026-01-15 16:41:27,019 - anomalib.utils.callbacks.timer - INFO - Testing took 82.84323573112488 seconds
Throughput (batch_size=32) : 8.075493359183836 FPS
QObject::moveToThread: Current thread (0x42201720) is not the object's thread (0x407b0f30).
Cannot move to target thread (0x42201720)
qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/cv2/qt/plugins" even though it was found.
This application failed to start because no Qt platform plugin could be initialized. Reinstalling the application may fix this problem.
Available platform plugins are: xcb, linuxfb, minimal, offscreen, vnc, webgl.
已放弃 (核心已转储)
如何解决呢
16.7 M Trainable params
36.0 M Non-trainable params
52.7 M Total params
210.842 Total estimated model params size (MB)
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/loggers/tensorboard.py:191: UserWarning: Could not log computational graph to TensorBoard: The model.example_input_array attribute is not set or input_array was not given.
rank_zero_warn(
Epoch 0: 0%| | 0/103 [00:00<?, ?it/s]/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/utilities/data.py:83: UserWarning: Trying to infer the batch_size from an ambiguous collection. The batch size we found is 32. To avoid any miscalculations, use self.log(..., batch_size=batch_size).
warning_cache.warn(
Epoch 0: 79%|██████████████████████████████████████████████████████████████████▊ | 81/103 [00:30<00:08, 2.62it/s, loss=-1.1e+04, v_num=44, train_loss_step=-3.06e+4]/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/pytorch_lightning/utilities/data.py:83: UserWarning: Trying to infer the batch_size from an ambiguous collection. The batch size we found is 26. To avoid any miscalculations, use self.log(..., batch_size=batch_size).
warning_cache.warn(
Epoch 45: 100%|█| 103/103 [00:56<00:00, 1.83it/s, loss=2.01e+07, v_num=44, train_loss_step=1.98e+7, pixel_F1Score=0.422, pixel_AUROC=0.907, pixel_AUPRO=0.494, train_loss_epoch=2.13e+/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/torch/init.py:1777: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
assert condition, message
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/timm/models/swin_transformer_v2.py:121: TracerWarning: Converting a tensor to a Python integer might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
B = int(windows.shape[0] / (H * W / window_size[0] / window_size[1]))
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/models/fastflow/torch_model.py:702: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
if H * W != L:
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/models/fastflow/torch_model.py:372: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]:
Epoch 45: 100%|█| 103/103 [01:02<00:00, 1.65it/s, loss=2.01e+07, v_num=44, train_loss_step=1.98e+7, pixel_F1Score=0.422, pixel_AUROC=0.907, pixel_AUPRO=0.494, train_loss_epoch=2.13e+
2026-01-15 16:40:03,340 - anomalib.utils.callbacks.timer - INFO - Training took 2643.60 seconds
2026-01-15 16:40:03,366 - anomalib - INFO - Loading the best model weights.
2026-01-15 16:40:03,366 - anomalib - INFO - Testing the model.
2026-01-15 16:40:03,371 - pytorch_lightning.utilities.rank_zero - INFO - The following callbacks returned in LightningModule.configure_callbacks will override existing callbacks passed to Trainer: EarlyStopping
2026-01-15 16:40:03,372 - pytorch_lightning.utilities.rank_zero - INFO - You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set torch.set_float32_matmul_precision('medium' | 'high') which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
2026-01-15 16:40:03,372 - anomalib.utils.callbacks.model_loader - INFO - Loading the model from results1/1217fastflow/fastflow/mvtec/run/weights/lightning/model-v38.ckpt
/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/anomalib/utils/callbacks/model_loader.py:32: FutureWarning: You are using torch.load with weights_only=False (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for weights_only will be flipped to True. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via torch.serialization.add_safe_globals. We recommend you start setting weights_only=True for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
pl_module.load_state_dict(torch.load(self.weights_path, map_location=pl_module.device)["state_dict"])
2026-01-15 16:40:04,068 - pytorch_lightning.accelerators.cuda - INFO - LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
Testing DataLoader 0: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 21/21 [01:09<00:00, 3.30s/it]2026-01-15 16:41:27,019 - anomalib.utils.callbacks.timer - INFO - Testing took 82.84323573112488 seconds
Throughput (batch_size=32) : 8.075493359183836 FPS
QObject::moveToThread: Current thread (0x42201720) is not the object's thread (0x407b0f30).
Cannot move to target thread (0x42201720)
qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "/home/jd/anaconda3/envs/anomalib/lib/python3.8/site-packages/cv2/qt/plugins" even though it was found.
This application failed to start because no Qt platform plugin could be initialized. Reinstalling the application may fix this problem.
Available platform plugins are: xcb, linuxfb, minimal, offscreen, vnc, webgl.
已放弃 (核心已转储)
如何解决呢
你这个崩溃不是模型/训练本身的问题,而是测试结束后触发了 OpenCV/Qt 的可视化(HighGUI),在你的运行环境里 Qt 的 xcb 插件无法初始化(常见于服务器/无显示器/没装 xcb 依赖/多进程下调用 GUI)。
日志里关键行:
qt.qpa.plugin: Could not load the Qt platform plugin "xcb"...This application failed to start because no Qt platform plugin could be initialized.QObject::moveToThread...(多进程/多线程下调用 Qt 常见警告)下面给你三种解决方案,按“最推荐/最省事”排序:
如果你只需要训练/测试指标、保存图片,不需要弹窗显示,这是最稳的。
bashpip uninstall -y opencv-python opencv-contrib-python
bashpip install opencv-python-headless
如果你用的是 conda-forge 的 opencv,也可能带 Qt;最简单还是用 pip 的 headless 覆盖。
很多时候 anomalib 会在 test 阶段做可视化(保存/显示/记录到 TensorBoard 等)。你不想要 GUI 的话,直接关掉。
在你的 config 里找 visualization / logger / callbacks 相关项(不同版本字段名略有差异),把“show/enable”的开关关掉。常见写法举例(按你项目实际字段调整):
yamlvisualization: show_images: false save_images: true # 只保存,不显示 log_images: false # 不往tensorboard里塞图(也可能触发cv2)
或者如果你用了某个 “VisualizerCallback / ImageVisualizerCallback”,直接把 callback 删掉/禁用。
如果你是在 Ubuntu 服务器/容器里想弹窗,那必须装齐系统库并有 X11。
bashsudo apt-get update sudo apt-get install -y \ libgl1 libglib2.0-0 libsm6 libxext6 libxrender1 \ libxkbcommon-x11-0 libxcb-xinerama0
echo $DISPLAY 有值如果你并不需要弹窗,那就别走方案 C,直接用方案 A。
在命令行先加这两句(即使你没显式用 matplotlib,也能防很多可视化默认走 Qt):
bashexport MPLBACKEND=Agg export QT_QPA_PLATFORM=offscreen
这不是根治(根治还是 headless opencv),但能减少一些环境相关问题。
从你的日志看,你是在多 GPU / DDP(CUDA_VISIBLE_DEVICES: [0,1])下跑,Qt + 多进程更容易炸。所以我建议你直接:
✅ 先做方案 A(opencv-python-headless)
然后(可选)✅ 再做方案 B(关掉 show/log_images 之类可视化)
这样基本一劳永逸。
如果你把你当前的 anomalib 配置里和可视化相关的那一段(visualization: / callbacks: / logger:)贴出来,我可以按你实际字段给你一份“直接可用”的 yaml 修改片段,确保不会再触发 cv2/Qt。
代码报错如下:AttributeError: module 'cv2' has no attribute 'CV_8U',怎么解决呢
这个报错几乎只有两类原因:
cv2 根本不是 OpenCV 的那个 cv2(被同名文件/错误包“劫持”了)下面按“最快定位→最快修复”给你一套步骤。
在你运行环境里执行:
bashpython -c "import cv2; import inspect; print('cv2 file:', getattr(cv2,'__file__',None)); print('version:', getattr(cv2,'__version__',None)); print('has CV_8U:', hasattr(cv2,'CV_8U'))"
cv2 file 指向了你项目目录里的某个 cv2.py / cv2/ 文件夹那就是被你工程里同名文件遮蔽了。
✅ 解决:把项目里的 cv2.py 或 cv2/ 目录 改名/移走,再重试。
在同一个环境里执行(很关键:把可能的包都卸掉):
bashpip uninstall -y cv2 opencv-python opencv-python-headless opencv-contrib-python opencv-contrib-python-headless
如果你用过 conda 装过 opencv,再来一条(避免 conda/pip 混装):
bashconda remove -y opencv
然后安装 headless(你是服务器/无GUI环境,应该用这个):
bashpip install opencv-python-headless==4.8.1.78
再验证一次:
bashpython -c "import cv2; print(cv2.__version__); print('CV_8U:', cv2.CV_8U)"
我建议固定一个版本(如 4.8.1.78),避免你环境里其它依赖对 OpenCV 有兼容性坑。
cv2.CV_8U(代码层兜底)有些代码只是想表达“8-bit 单通道”。你可以替换为:
np.uint8(dtype)0(OpenCV 里 CV_8U 通常等价于 0)例如:
pythonimport numpy as np img = img.astype(np.uint8)
但对 anomalib 这类库来说,更推荐把 OpenCV 安装修好(走 B)。
pip install cv2如果你曾经装过 cv2 这个名字的包,它不是官方 OpenCV 绑定,容易出现你这种“缺常量”的情况。一定要用 opencv-python* 或 opencv-python-headless*。
如果你把上面 A 步那条命令输出的三行(cv2 file/version/has CV_8U)贴出来,我可以直接告诉你是“同名文件遮蔽”还是“混装残留”,以及你该走 B 里的哪条分支更稳。