我现在使用anomalib库进行悬挂式磁浮列车障碍物检测,这是我写的论文,里面的结果还很好,但是论文...
Creado el: 8 de enero de 2026
Respondido usando GPT-5.2 Pro por Chat01
Creado el: 8 de enero de 2026
Respondido usando GPT-5.2 Pro por Chat01
我现在使用anomalib库进行悬挂式磁浮列车障碍物检测,这是我写的论文,里面的结果还很好,但是论文具体结果可以看第一张图片,我丰富数据集后结果如第二张图,但是我还是按照原始的论文的代码,配置文件如下:dataset:
name: mvtec
format: folder
path: /media/jd/4997BB1603CFE2C4/lw/liwei23/anomalib/dataset
normal_dir: normal # 自制数据集正样本子文件夹
abnormal_dir: abnormal0 # 自制数据集负样本子文件夹
mask_dir: label_mask0 # 二值掩膜路径,自制数据集一般没有,填null
category: bottle
normal_test_dir: normal_test0 # name of the folder containing normal test images.
task: segmentation
extensions: null
train_batch_size: 32
eval_batch_size: 32
num_workers: 8
image_size: 256 # dimensions to which images are resized (mandatory)
center_crop: null # dimensions to which images are center-cropped after resizing (optional)
normalization: imagenet # data distribution to which the images will be normalized: [none, imagenet]
transform_config:
train: null
eval: null
test_split_mode: from_dir # options: [from_dir, synthetic]
test_split_ratio: 0.2 # fraction of train images held out testing (usage depends on test_split_mode)
val_split_mode: same_as_test # options: [same_as_test, from_test, synthetic]
val_split_ratio: 0.5 # fraction of train/test images held out for validation (usage depends on val_split_mode)
tiling:
apply: false
tile_size: null
stride: null
remove_border_count: 0
use_random_tiling: False
random_tile_count: 16
model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 9 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: pixel_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
metrics:
image:
- F1Score
- AUROC
pixel:
- F1Score
- AUROC
- AUPRO
threshold:
method: adaptive #options: [adaptive, manual]
manual_image: null
manual_pixel: null
visualization:
show_images: False # show images on the screen
save_images: True # save images to the file system
log_images: True # log images to the available loggers (if any)
image_save_path: ./results1/1217fastflow/fastflow/mvtec/run/images # path to which images will be saved
mode: simple # options: ["full", "simple"]
project:
seed: 42
path: ./results1/1217fastflow
logging:
logger: [tensorboard,csv]
log_graph: true
optimization:
export_mode: null # options: torch, onnx, openvino
trainer:
enable_checkpointing: true
default_root_dir: null
gradient_clip_val: 0
gradient_clip_algorithm: norm
num_nodes: 1
devices: 1
enable_progress_bar: true
overfit_batches: 0.0
track_grad_norm: -1
check_val_every_n_epoch: 1 # Don't validate before extracting features.
fast_dev_run: false
accumulate_grad_batches: 1
max_epochs: 500
min_epochs: null
max_steps: -1
min_steps: null
max_time: null
limit_train_batches: 1.0
limit_val_batches: 1.0
limit_test_batches: 1.0
limit_predict_batches: 1.0
val_check_interval: 1.0 # Don't validate before extracting features.
log_every_n_steps: 50
accelerator: auto # <"cpu", "gpu", "tpu", "ipu", "hpu", "auto">
strategy: null
sync_batchnorm: false
precision: 32
enable_model_summary: true
num_sanity_val_steps: 0
profiler: null
benchmark: false
deterministic: false
reload_dataloaders_every_n_epochs: 0
auto_lr_find: false
replace_sampler_ddp: true
detect_anomaly: false
auto_scale_batch_size: false
plugins: null
move_metrics_to_cpu: false
multiple_trainloader_mode: max_size_cycle
异常图anomaly_map.py代码:"""FastFlow Anomaly Map Generator Implementation."""
from future import annotations
import torch
import torch.nn.functional as F
from omegaconf import ListConfig
from torch import Tensor, nn
class AnomalyMapGenerator(nn.Module):
"""Generate Anomaly Heatmap."""
textdef __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: """Generate Anomaly Heatmap. This implementation generates the heatmap based on the flow maps computed from the normalizing flow (NF) FastFlow blocks. Each block yields a flow map, which overall is stacked and averaged to an anomaly map. Args: hidden_variables (list[Tensor]): List of hidden variables from each NF FastFlow block. Returns: Tensor: Anomaly Map. """ flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: log_prob = -torch.mean(hidden_variable**2, dim=1, keepdim=True) * 0.5 prob = torch.exp(log_prob) flow_map = F.interpolate( input=-prob, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # anomaly_map = torch.mean(flow_maps, dim=-1) # 1. 平均值聚合 (作为第一步) anomaly_map_averaged = torch.mean(flow_maps, dim=-1) # 得到平均值聚合的异常图 # 2. 空间邻域最大池化 (在平均值聚合的异常图上进行) kernel_size = 3 # 可以尝试不同的 kernel_size,例如 3x3, 5x5 padding = (kernel_size - 1) // 2 # 保持输出尺寸与输入尺寸相同 ("same" padding) anomaly_map = F.max_pool2d( anomaly_map_averaged, kernel_size=kernel_size, stride=1, # 步长通常设置为 1,进行密集池化 padding=padding # 使用 padding 保持尺寸 ) return anomaly_map
torch_model.py的内容如下:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # 直接拼接:通道对齐 fused_feature = features[i] + aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) weighted_feats = sum(aligned_features) C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
但是同样地直接运行代码后,原始论文时step为2效果最好,但是我现在使用2时结果如下:59.8,59.32,54.64,93.2,37.84,我现在使用step为8时结果好点:61.47,58.89,46.69,88.86,38.55,怎么办呢,我按照这个lightning_model.py和loss.py的代码都保持之前的原始内容,那么我现在怎么办呢,虽然“论文写了自学习 α”,但是我发现只有9/1时效果最好,所以我就按9/1固定学习参数这个不需要改,我论文结构的都已经实现了,你不需要复现我的论文的功能额,因为我论文是复杂版本,代码实现的最简单的省略版本。我就是按这个代码跑的论文结果,新数据集里原始的fastflow的结果如下:65.42,60.61,38.58,82.26,10.7,padim的结果如下:64.71,61.14,52.89,91.35,8.53,cfa的结果如下:70.63,64.06,39.72,89.72,16.45,dsr的结果如下:35.75,59.98,30.72,78.29,7.81,csflow的结果如下:67.8,61.6,22.04,82.03,11.77,cflow的结果如下:36.81,57.2,37.06,86.98,19.82,不要根据我的论文去改代码,论文的代码就是现在我上传的,所以你只是要告诉我如何调整代码,我的论文对代码进行了适当学术化修改,所以不要复现我的论文,你需要根据现在的问题去修改,去改进,首先调整这个kernel_size = 7对结果没有增益,相反,效果会变差,不管是5,7,都会让效果变差,当我的代码变为下面的版本时:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 # 原来可能是 nn.Parameter(torch.zeros(3)) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W # self.global_flow_block = create_fast_flow_block( # input_dimensions=[fused_ch_global, H_global, W_global], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) local_steps = flow_steps # 12 global_steps = max(16, flow_steps) # 16 起步,必要时可试 20 self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) # weighted_feats = sum(aligned_features) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
设置如下:只改 gate 初始化为 -3.5(其它不变)结果如下:59.43,58.27,41.21,87.23,36.65,gate=-3.5 + global_steps=16的结果如下:61.72,58.90,50.81,89.77,35.47,在 2 的基础上扫 w_global=0.2,结果如下:61.77,59.02,50.51,89.67,35.50,在 2 的基础上扫 w_global=0.3,结果如下:61.72,58.85,50.81,89.77,35.47
global_steps=20 + w_global=0.1,结果如下:60.39,59.27,46.78,89.55,39.48,请根据上述内哦让那个,去修正我的代码,让他效果更好,记住论文只是让你大致了解思路,你只要看算法结果,根据上述内容据徐修改代码
你这组现象其实很“典型”:数据集一扩充(尤其是正常样本分布更宽、场景更杂、光照/背景变化更大),原来在小数据集上“刚好够用”的流容量(n/steps=2)就会明显欠拟合,所以你现在看到 step=2 整体掉、step=8 反而更好是合理的;你后面把 global_steps 拉到 16/20后 AUPRO/F1 又明显上来,也进一步印证了“容量不足”是主因,而不是 kernel_size。你论文里把 n=2 当成统一设置,本质是当时实验/算力折中,并不是对新数据集也必然最优(你论文的实验设置里也写了统一设置 n=2、lr=0.001 等)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
下面我不复现你的论文,只基于你现在最有增益的改动方向(gate 初始化、global_steps、w_global),给你一套“更可能继续涨分”的代码级修正,重点解决三个你当前版本里最容易拖后腿的问题:
在你的 forward 里这一段:
pythonst = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t
改成(关键:tanh+clamp,再做一次 GroupNorm 稳定分布):
python# 在 __init__ 里加 self.pre_affine_clamp = 2.0 self.global_norm = nn.GroupNorm(1, fused_ch_global) # fused_ch_global=96 # forward 里改 st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) # 关键:限制 s 的范围,避免 exp 爆炸 s = torch.tanh(s) * self.pre_affine_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # 关键:global path 再做一次稳定归一化 conditioned_feature = self.global_norm(conditioned_feature)
为什么这一步很可能直接涨分:
你现在 global_steps=16/20 时,global 分支容量更强,但同时也更容易学出极端 s,导致 anomaly map 在正常图上也出现尖峰(image AUROC/像素 FPR 变差)。clamp 后通常会:
你 Local loop 里算了 fused_feature,但 Global Path 仍在用 features。
把 local loop 改成同时保存 fused_features:
pythonfused_features = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) ...
然后 Global Path 这段:
pythontarget_size = features[-1].shape[2:] for i, feat in enumerate(features): ...
改成:
pythontarget_size = fused_features[-1].shape[2:] for i, feat in enumerate(fused_features): ...
为什么这步重要:
你最终融合 anomaly_map 是 local/global 的线性组合。两条路如果输入分布不一致,global map 经常会“补错方向”,表现为:pixel 指标上去但 image AUROC 掉,或者 AUPRO/F1 互相拉扯。
你现在:
pythongate = torch.sigmoid(self.aux_gates[i]) # 标量 fused_feature = features[i] + gate * aux_feat_aligned
建议改成:标量 gate × 门控卷积 gate_map(空间/通道自适应)× 归一化后的 aux 特征。
python# aux 对齐后做稳定归一化(比 BN 更稳,不依赖 batch 统计) self.aux_norms = nn.ModuleList([nn.GroupNorm(1, ch) for ch in self.channels]) # 空间/通道自适应 gate:输入 concat([feat, aux]) -> 输出 gate_map in [0,1] self.aux_gate_convs = nn.ModuleList() for ch in self.channels: gate_conv = nn.Conv2d(ch * 2, ch, kernel_size=1, bias=True) nn.init.zeros_(gate_conv.weight) nn.init.constant_(gate_conv.bias, -3.5) # 初始 sigmoid≈0.029,和你当前 gate=-3.5 的思路一致 self.aux_gate_convs.append(nn.Sequential(gate_conv, nn.Sigmoid()))
pythonaux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) aux_feat_aligned = self.aux_norms[i](aux_feat_aligned) gate_scalar = torch.sigmoid(self.aux_gates[i]) # 你验证过 -3.5 很有效,保留 gate_map = self.aux_gate_convs[i](torch.cat([features[i], aux_feat_aligned], dim=1)) fused_feature = features[i] + gate_scalar * gate_map * aux_feat_aligned
这步为什么会比“只有标量 gate”更可能涨分:
你现在 global 的加权是:
pythonw = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2]
这个是全局常数权重,对你新数据集(场景变化更大)不够灵活。
你其实已经写了 channel_weighting_module,但你定义时 concatenated_channel = fused_ch_global,如果你想用“拼接后再算权重”,那输入通道应该是 96*3=288。
把:
pythonconcatenated_channel = fused_ch_global self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) )
改成(输出 logits,不直接 softmax,方便叠加 global_scale_logits 做先验):
pythonconcatenated_channel = fused_ch_global * len(channels) # 96*3 self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(inplace=True), nn.Conv2d(concatenated_channel // 4, len(channels), 1), # logits )
pythonconcat_feats = torch.cat(aligned_features, dim=1) # [B, 288, H, W] logits_dyn = self.channel_weighting_module(concat_feats) # [B, 3, 1, 1] logits = logits_dyn + self.global_scale_logits.view(1, -1, 1, 1) w = torch.softmax(logits, dim=1) # [B,3,1,1] weighted_feats = w[:, 0:1] * aligned_features[0] + w[:, 1:2] * aligned_features[1] + w[:, 2:3] * aligned_features[2]
预期收益:
你现在 anomaly_map 走的是:
log_prob = -0.5 * mean(h^2)prob = exp(log_prob)flow_map = -prob这个对“异常强度跨度很大”的新数据集会有一个问题:exp 会让强异常很快饱和到接近 0,导致不同异常的对比度被压扁,AUPRO/F1 很容易受影响。
更稳的做法是直接用 能量项(近似 -log p):0.5*mean(h^2)。这也和你论文里“负对数似然与 h 的 L2 范数相关”的描述一致(你文中损失也写的是范数项)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
把 forward 改成下面这个版本(保留 kernel=3,且加一个轻微 avg_pool 再 max_pool,通常能让边界更稳定):
pythondef forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: # 用能量(≈ -log p)替代 exp(prob),避免饱和 score = 0.5 * torch.mean(hidden_variable ** 2, dim=1, keepdim=True) # [B,1,h,w] flow_map = F.interpolate( input=score, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] # 可选:对多尺度做“更偏向高分辨率”的加权(对小异物一般更友好) n = flow_maps.shape[-1] if n > 1: weights = torch.tensor([1.0 / (2 ** i) for i in range(n)], device=flow_maps.device, dtype=flow_maps.dtype) weights = weights / weights.sum() anomaly_map_averaged = (flow_maps * weights.view(1, 1, 1, 1, n)).sum(dim=-1) else: anomaly_map_averaged = flow_maps.squeeze(-1) # 先轻微平滑,再 max_pool,通常比直接 max_pool 更稳 anomaly_map_averaged = F.avg_pool2d(anomaly_map_averaged, kernel_size=3, stride=1, padding=1) anomaly_map = F.max_pool2d( anomaly_map_averaged, kernel_size=3, stride=1, padding=1 ) return anomaly_map
你已经验证过:
所以我建议你下一轮先别再用 step=2 做主配置了,直接用:
这套改动的逻辑是:在不改变 lightning_model/loss 的情况下,减少“正常图爆点”、提升“小异物对比度”、让 global 分支更稳定且与 local 同分布。按你目前的指标表现,最可能带来的提升方向是:
你现在 pixel 指标已经很强了,但 image AUROC 偏低,一个常见原因是:image score 用 max(anomaly_map) 太敏感(一个噪声尖峰就把整张正常图判异常)。
如果 anomalib 的 image score 是 max(FastFlow/Padim 经常是这样),你可以把 image score 改成“Top-k 均值 / 分位数”,只影响 image AUROC / image F1,通常不会伤害像素定位,反而能减少误报。
思路(伪代码):
python# anomaly_map: [B,1,H,W] flat = anomaly_map.view(B, -1) score = torch.quantile(flat, 0.995, dim=1) # 或 topk mean
这个改动一般对铁路这种“背景纹理强、偶发亮点多”的数据集很有用。
如果你按我上面这套改完,下一轮把结果(同样 5 个指标)贴出来,我可以再基于你新一轮的“涨/跌项”给你做更精确的下一步(比如:global_steps=20 下 AUPRO 仍低,就优先调 clamp/scale-weight;如果 pixel F1 上不去,就把 local 三尺度权重再偏向 stage1)。
target_size = fused_features[-1].shape[2:]
for i, feat in enumerate(fused_features): ...你写错了吧,应该是target_size = fused_feature[-1].shape[2:]
for i, feat in enumerate(fused_feature):
你说得对:我那段变量名的含义如果不讲清楚,确实容易让人以为写错了。
fused_feature:单个尺度融合后的特征张量(Tensor),形状一般是 [B, C, H, W]fused_features:把三个尺度的 fused_feature 收集起来的列表(list[Tensor]),长度=3所以我建议的写法里,target_size 和 for enumerate(...) 都必须基于列表 fused_features,不能用单个 fused_feature。你写的:
pythontarget_size = fused_feature[-1].shape[2:] for i, feat in enumerate(fused_feature):
这里会有两个问题:
fused_feature[-1] 取的是 batch 的最后一张图(或最后一个维度的切片),不是“最后一个尺度”。enumerate(fused_feature) 会沿着 tensor 的第 0 维(batch)迭代,完全不是你想要的“按尺度迭代”。在 local path 里先收集 3 个尺度的融合特征:
pythonfused_features = [] for i in range(len(features)): ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) ...
然后 global path 用这个列表做对齐/下采样:
pythontarget_size = fused_features[-1].shape[-2:] # 最小尺度(H,W) aligned_features = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat)
这也符合你文稿里“各尺度特征先统一通道、下采样到最小尺度,再融合/加权”的流程描述。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
fused_feature 这个名字也可以,但必须让它代表“列表”,比如改名成:
pythonfused_feature = [] # 注意:这里 fused_feature 变成 list 了 ... fused_feature.append(tensor) ... target_size = fused_feature[-1].shape[-2:] for i, feat in enumerate(fused_feature): ...
本质还是:要么 fused_features 是 list,要么你把 fused_feature 也当 list 用,不能“名字像 list、实际是 tensor”。
如果你把你当前代码里 local loop 那几行(你实际定义 fused 的地方)贴出来,我可以按你现有变量名直接给你一段“可复制粘贴”的最小 diff,避免你改动太多地方。
请给出完整的修改后的torch_model.py的内容,现在换来换去我已经把代码改乱了
请给出完整的修改后的torch_model.py的内容,现在换来换去我已经把代码改乱了
下面给你一份**“可直接整文件覆盖”**的 torch_model.py(在你现在这版基础上,把你提到的 gate=-3.5、global_steps 更大、global 权重融合 等思路都整理成一个干净、稳定、不容易炸数值的版本)。
它做了这些关键修正(都已经写进代码里):
global_steps 默认取 max(20, flow_steps)(你可以自己改)这个实现也更贴近你文稿里“门控卷积动态权重 + 通道加权 + 仿射变换”的文字描述(例如门控卷积、通道加权、s/b 等)。(秦勇-杨婧婕_作者刊前确认_1226回复.pdf) (秦勇-杨婧婕_作者刊前确认_1226回复.pdf)
python"""FastFlow Torch Model Implementation. Original Code Copyright (c) 2022 @gathierry https://github.com/gathierry/FastFlow/. SPDX-License-Identifier: Apache-2.0 Modified Copyright (C) 2022 Intel Corporation SPDX-License-Identifier: Apache-2.0 """ from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: padding="same" breaks onnx export so manual padding required. padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(inplace=True), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: List[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow + (your simplified) Dual-Path Fusion variant.""" def __init__( self, input_size: Tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # ====== 你现在固定的融合权重(9/1) ====== self.w_global = 0.1 # anomaly_map = (1-w)*local + w*global # ====== 全局路径的“自写仿射”稳定化:限制 exp(s) ====== self.pre_affine_clamp = 2.0 # ====== Backbone ====== if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = None elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) # 你原来手写的 3-stage 设定 channels = [96, 192, 384] scales = [4, 8, 16] self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. " f"Supported: [cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2, swinv2_*_256]." ) self.channels = channels self.scales = scales self.num_stages = len(channels) for p in self.feature_extractor.parameters(): p.requires_grad = False # ====== FastFlow blocks(每个尺度一个) ====== self.fast_flow_blocks = nn.ModuleList() for ch, sc in zip(self.channels, self.scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ch, int(input_size[0] / sc), int(input_size[1] / sc)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ====== 只有 3-stage(例如 SwinV2)才启用你的 Dual-Path ====== self.use_dual_path = (self.num_stages == 3) if self.use_dual_path: # --- Aux 分支(ResNet18)--- self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], kernel_size=1) for i in range(3)] ) # 标量 gate(你验证过 -3.5 好) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 # aux 归一化(更稳) self.aux_norms = nn.ModuleList([nn.GroupNorm(1, ch) for ch in self.channels]) # gate_map:门控卷积(空间/通道自适应) self.aux_gate_convs = nn.ModuleList() for ch in self.channels: gate_conv = nn.Conv2d(ch * 2, ch, kernel_size=1, bias=True) nn.init.zeros_(gate_conv.weight) nn.init.constant_(gate_conv.bias, -3.5) # 初始 gate_map 很小 self.aux_gate_convs.append(nn.Sequential(gate_conv, nn.Sigmoid())) # --- Global Path:对齐到 96 通道 --- fused_ch_global = 96 self.fused_ch_global = fused_ch_global self.align_convs_global = nn.ModuleList( [nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True) for ch in self.channels] ) # 动态尺度权重 + 先验权重 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # 先验 concatenated_channel = fused_ch_global * 3 # 96*3 self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, kernel_size=1), nn.ReLU(inplace=True), nn.Conv2d(concatenated_channel // 4, 3, kernel_size=1), # logits ) # 你自写仿射子网:A(48ch)->输出 96ch -> chunk 得到 s,t(各48ch) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) # 归一化,稳住 global conditioned_feature self.global_norm = nn.GroupNorm(1, fused_ch_global) # global flow_steps:默认更大(你扫出来 16/20 更有效) global_steps = max(20, flow_steps) H_global = input_size[0] // self.scales[-1] W_global = input_size[1] // self.scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Return anomaly map (eval) or (hidden_variables, log_jacobians) (train).""" self.feature_extractor.eval() # ====== Backbone features ====== if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # ====== 原始 FastFlow(非 3-stage 时直接走原版) ====== if not self.use_dual_path: hidden_variables: List[Tensor] = [] log_jacobians: List[Tensor] = [] for block, feat in zip(self.fast_flow_blocks, features): hv, lj = block(feat) hidden_variables.append(hv) log_jacobians.append(lj) if self.training: return (hidden_variables, log_jacobians) return self.anomaly_map_generator(hidden_variables) # ====== Dual-Path ====== self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # list length 3 # ---- Local Path:融合 aux + flow ---- fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(3): aux_aligned = self.resnet_align[i](aux_feats[i]) if aux_aligned.shape[-2:] != features[i].shape[-2:]: aux_aligned = F.interpolate(aux_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) aux_aligned = self.aux_norms[i](aux_aligned) gate_scalar = torch.sigmoid(self.aux_gates[i]) # 标量(全局) gate_map = self.aux_gate_convs[i](torch.cat([features[i], aux_aligned], dim=1)) # 空间/通道自适应 fused_feat = features[i] + gate_scalar * gate_map * aux_aligned fused_features.append(fused_feat) hv, lj = self.fast_flow_blocks[i](fused_feat) local_hidden_variables.append(hv) local_log_jacobians.append(lj) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ---- Global Path:用 fused_features(保持分布一致)---- target_size = fused_features[-1].shape[-2:] # 最小尺度(H,W) aligned_features: List[Tensor] = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat) # 动态尺度权重 + 先验 concat_feats = torch.cat(aligned_features, dim=1) # [B, 96*3, H, W] logits_dyn = self.channel_weighting_module(concat_feats) # [B, 3, 1, 1] logits = logits_dyn + self.global_scale_logits.view(1, 3, 1, 1) w = torch.softmax(logits, dim=1) # [B,3,1,1] weighted_feats = ( w[:, 0:1] * aligned_features[0] + w[:, 1:2] * aligned_features[1] + w[:, 2:3] * aligned_features[2] ) # [B,96,H,W] C = weighted_feats.size(1) # 96 A, B = torch.split(weighted_feats, C // 2, dim=1) # 48,48 st = self.subnet(A) # [B,96,H,W] s, t = torch.chunk(st, 2, dim=1) # 48,48 # 关键:限制 s,避免 exp(s) 炸 s = torch.tanh(s) * self.pre_affine_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # [B,96,H,W] conditioned_feature = self.global_norm(conditioned_feature) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ---- 融合 anomaly maps(你固定 9/1)---- anomaly_map = (1.0 - self.w_global) * local_anomaly_map + self.w_global * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] if self.training: return (hidden_variables, log_jacobians) return anomaly_map def _get_cnn_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor(input_tensor) if self.norms is not None: features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError( f"Cannot reshape [B,L,C]=[{B},{L},{C}] to [B,C,H,W]=[{B},{C},{H},{W}] because H*W != L." ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
flow_steps=8~9(你已经验证 8 比 2 好)w_global=0.1(文件里默认 0.1)global_steps:这份代码默认 max(20, flow_steps),也就是你想要的 20 起步如果你跑完把新结果(那 5 个数)贴出来,我可以再根据“哪一项涨/跌”帮你进一步做有方向的下一步微改(比如只动 global_steps、只动 pre_affine_clamp、或只动 gate 初始化/尺度权重策略)。
按照这个跑完代码地结果很差,下一步怎么改进呢,具体如下:56.86,59.19,40.04,85.71,28.93,
你这组 56.86,59.19,40.04,85.71,28.93 变差,最主要原因基本可以锁定:我上一版在 torch_model.py 里引入了太多“可学习融合模块”(gate_map、动态尺度权重、额外Norm、tanh-clamp),在你 lightning_model.py / loss.py 不变、且训练只用 normal 的 NLL 目标下,这些模块很容易学到“让分布更好拟合 normal、但把可分性/定位能力弄没”的方向,导致总体指标掉下去。
下一步建议:先回到你已经验证过最稳、最接近最优的那条线:
aux_gates=-3.5(标量 gate) + global_scale_logits(三尺度静态softmax权重) + global_steps>=16 + w_global=0.1~0.2下面给你一份**“干净版、可直接覆盖整文件”** torch_model.py(就是按你之前测出来较好的那种结构整理的;并且把你指出的 target_size = fused_features[-1].shape[2:] 写正确了)。
python"""FastFlow Torch Model Implementation.""" from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(inplace=True), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: List[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow + (your simplified) dual-path fusion version.""" def __init__( self, input_size: Tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # ====== 你验证过的:固定融合权重(默认给你 0.2,可自己改回 0.1)====== # anomaly_map = (1-w_global)*local + w_global*global self.w_global = 0.2 # ====== global 的 flow steps(你验证过 16/20 比较有效)====== self.global_steps = max(16, flow_steps) if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = None elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [96, 192, 384] scales = [4, 8, 16] self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError(f"Unsupported backbone: {backbone}") self.channels = channels self.scales = scales for p in self.feature_extractor.parameters(): p.requires_grad = False # ====== Aux extractor (ResNet18) ====== self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False # resnet18 channels for out_indices [1,2,3] resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], kernel_size=1) for i in range(3)] ) # ====== 你验证过的 gate 初始化 ====== self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) # sigmoid≈0.029 # ====== Local flow blocks ====== self.fast_flow_blocks = nn.ModuleList() for ch, sc in zip(self.channels, self.scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ch, int(input_size[0] / sc), int(input_size[1] / sc)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ====== Global path: align to 96ch ====== self.fused_ch_global = 96 self.align_convs_global = nn.ModuleList( [nn.Conv2d(ch, self.fused_ch_global, kernel_size=1, bias=True) for ch in self.channels] ) # 3-scale static weights (trainable) self.global_scale_logits = nn.Parameter(torch.zeros(3)) H_global = input_size[0] // self.scales[-1] W_global = input_size[1] // self.scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[self.fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=self.global_steps, ) # global affine subnet (A->(s,t)) self.subnet = nn.Sequential( nn.Conv2d(self.fused_ch_global // 2, self.fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(self.fused_ch_global // 2, self.fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Train: (hidden_variables, log_jacobians); Eval: anomaly_map.""" self.feature_extractor.eval() # backbone features if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # aux features self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [3] # ====== Local path ====== fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(3): aux_aligned = self.resnet_align[i](aux_feats[i]) if aux_aligned.shape[-2:] != features[i].shape[-2:]: aux_aligned = F.interpolate(aux_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # scalar fused_feat = features[i] + gate * aux_aligned fused_features.append(fused_feat) hv, lj = self.fast_flow_blocks[i](fused_feat) local_hidden_variables.append(hv) local_log_jacobians.append(lj) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====== Global path (use fused_features!) ====== target_size = fused_features[-1].shape[2:] # ✅ 你指出的点:必须是 list[-1] aligned_features: List[Tensor] = [] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(feat) w = torch.softmax(self.global_scale_logits, dim=0) # [3] weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) # 96 A, B = torch.split(weighted_feats, C // 2, dim=1) # 48, 48 st = self.subnet(A) # 96 s, t = torch.chunk(st, 2, dim=1) # 不做 tanh-clamp(你之前更好的版本也没做) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) # 96 global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====== fuse anomaly map ====== anomaly_map = (1.0 - self.w_global) * local_anomaly_map + self.w_global * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] if self.training: return (hidden_variables, log_jacobians) return anomaly_map def _get_cnn_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor(input_tensor) if self.norms is not None: features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> List[Tensor]: feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> List[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError(f"Cannot reshape [B,L,C]=[{B},{L},{C}] to [B,C,H,W] because H*W != L.") feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
你先用上面文件跑一遍,目标是至少回到你之前那档 61.x / 50.x / 89.x。在此基础上再做下面这些“小步快跑”:
self.w_global:0.1 / 0.15 / 0.2
self.global_steps:16 / 20
w_global 一起扫推荐组合顺序:
(global_steps=16, w=0.15) → (16, 0.2) → (20, 0.1) → (20, 0.15)
你现在 conv3x3_only: True,建议你试一次 False(它会让 flow block 交替出现 1×1,通常对复杂背景/域迁移更友好)——这一步往往比你在 forward 里加一堆融合层更有效。
再加这一行“保险”(只在你出现 NaN/inf 或者训练特别抖时用):
pythons = torch.clamp(s, -5, 5) B_transformed = torch.exp(s) * B + t
这属于稳定性补丁,不是为了变强。
把你用上面“干净版”跑出来的新 5 个数贴我(以及你此时 w_global/global_steps/flow_steps/conv3x3_only 的具体值),我就能根据“到底是哪一项掉了”给你下一次只改 1~2 行的方向(比如:global 用 fused 的同时,local 是否需要改成 features + gate*(aux - features) 这种更像残差校正的形式)。
我按照下面的代码跑完的结果如下,结果目前是最好,但是对比其他算法还是不够,下一步怎么改进呢:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.vision_transformer import VisionTransformer
from timm.models.swin_transformer_v2 import SwinTransformerV2
from torch import Tensor, nn
import torch.nn.functional as F
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ("swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256","swinv2_base_window16_256" ): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) # 手动定义 channels 和 scales channels = [96,192,384] scales = [4,8,16] self.channels = channels self.scales = scales # 保存 scales 以供 _get_swimt_features 使用 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3] # 对应 layer1, layer2, layer3 ) for param in self.aux_extractor.parameters(): param.requires_grad = False # resnet18 每层输出通道数 resnet_channels = [64, 128, 256] # 与 out_indices=[1,2,3] 对应 # 定义辅助分支的对齐层:将 CNN 分支输出变为对应 Swin 层的通道数(自适应) self.resnet_align = nn.ModuleList([ nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3) ]) self.fast_flow_blocks = nn.ModuleList() for i in range(3): fused_ch = self.channels[i] self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[fused_ch, int(input_size[0] / self.scales[i]), int(input_size[1] / self.scales[i])], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # self.fast_flow_blocks = nn.ModuleList() # for channel, scale in zip(channels, scales): # self.fast_flow_blocks.append( # create_fast_flow_block( # input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], # conv3x3_only=conv3x3_only, # hidden_ratio=hidden_ratio, # flow_steps=flow_steps, # ) # ) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 # 设定全局通道对齐后单个尺度多少通道 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) # 拼接后总通道 concatenated_channel = fused_ch_global H_global = input_size[0] // scales[-1] # 最小 H W_global = input_size[1] // scales[-1] # 最小 W self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1) ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1) ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # def forward(self, input_tensor: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: # """Forward-Pass the input to the FastFlow Model. # # Args: # input_tensor (Tensor): Input tensor. # # Returns: # Tensor | list[Tensor] | tuple[list[Tensor]]: During training, return # (hidden_variables, log-of-the-jacobian-determinants). # During the validation/test, return the anomaly map. # """ # # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(input_tensor) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(input_tensor) # else: # features = self._get_cnn_features(input_tensor) # # # Compute the hidden variable f: X -> Z and log-likelihood of the jacobian # # (See Section 3.3 in the paper.) # # NOTE: output variable has z, and jacobian tuple for each fast-flow blocks. # hidden_variables: list[Tensor] = [] # log_jacobians: list[Tensor] = [] # for fast_flow_block, feature in zip(self.fast_flow_blocks, features): # hidden_variable, log_jacobian = fast_flow_block(feature) # hidden_variables.append(hidden_variable) # log_jacobians.append(log_jacobian) # # return_val = (hidden_variables, log_jacobians) # # if not self.training: # return_val = self.anomaly_map_generator(hidden_variables) # # return return_val # 第1 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2+3 def forward(self, image: torch.Tensor) -> Union[ torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" """Return anomaly map.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() # backbone 前向用 eval if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: # ResNet / WideResNet features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] # ------------------- 提取辅助分支(ResNet18)特征 ------------------- self.aux_extractor.eval() aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # ------------------- Local Path:逐层融合 + Flow ------------------- local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): # 不对 Swin 特征做通道对齐,直接保留 # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate(aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # ====================== Global Path ====================== aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) # weighted_feats = sum(aligned_features) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # ====================== Combine Anomaly Maps ====================== anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # ====================== Combine Hidden Variables (保持合并) ====================== hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) if not self.training: # 测试/评估模式 return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第1+2 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # # ResNet / WideResNet # features = self.feature_extractor(image) # features = [self.norms[i](feat) for i, feat in enumerate(features)] # # # ------------------- 提取辅助分支(ResNet18)特征 ------------------- # self.aux_extractor.eval() # aux_feats = self.aux_extractor(image) # [layer1, layer2, layer3] # # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # aux_feat_aligned = self.resnet_align[i](aux_feats[i]) # # 直接拼接:通道对齐 # fused_feature = features[i] + aux_feat_aligned # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # anomaly_map = self.anomaly_map_generator(hidden_variables) # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 # 第2+3 # def forward(self, image: torch.Tensor) -> Union[ # torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: # 返回类型提示保持 # """Forward-Pass for Dual-Path FastFlow Model (最终兼容异常图处理).""" # """Return anomaly map.""" # return_val: Tensor | list[Tensor] | tuple[list[Tensor]] # # self.feature_extractor.eval() # backbone 前向用 eval # if isinstance(self.feature_extractor, VisionTransformer): # features = self._get_vit_features(image) # elif isinstance(self.feature_extractor, Cait): # features = self._get_cait_features(image) # elif isinstance(self.feature_extractor, SwinTransformerV2): # features = self._get_swint_features(image) # else: # features = self._get_cnn_features(image) # # ------------------- Local Path:逐层融合 + Flow ------------------- # local_hidden_variables = [] # local_log_jacobians = [] # for i in range(len(features)): # # 不对 Swin 特征做通道对齐,直接保留 # # 利用 resnet_align 将 CNN 分支通道调整为与对应 Swin 层相同 # # 直接拼接:通道对齐 # fused_feature = features[i] # hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) # local_hidden_variables.append(hidden_variable) # local_log_jacobians.append(log_jacobian) # local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # # # ====================== Global Path ====================== # aligned_features = [] # target_size = features[-1].shape[2:] # for i, feat in enumerate(features): # feat = self.align_convs_global[i](feat) # down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) # aligned_features.append(down_feat) # # weighted_feats = sum(aligned_features) # C = weighted_feats.size(1) # A, B = torch.split(weighted_feats, C // 2, dim=1) # # st = self.subnet(A) # s, t = torch.chunk(st, 2, dim=1) # B_transformed = torch.exp(s) * B + t # # conditioned_feature = torch.cat([A, B_transformed], dim=1) # global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) # # global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # # # ====================== Combine Anomaly Maps ====================== # anomaly_map = 0.9*local_anomaly_map +0.1*global_anomaly_map # 融合异常图 # # # ====================== Combine Hidden Variables (保持合并) ====================== # hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] # 合并 hidden_variables # log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] # 合并 log_jacobians # # # ====================== 返回值处理 (严格兼容原始结构和名称) ====================== # return_val = (hidden_variables, log_jacobians) # 训练模式下,return_val 仍然是 (hidden_variables, log_jacobians) # # if not self.training: # 测试/评估模式 # return_val = anomaly_map # 测试模式下,return_val 现在是 *融合后的异常图*,而不是之前的 anomaly_map_generator(hidden_variables) # # return return_val # 最终返回 return_val,名称与原始代码完全一致 def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): B, L, C = feat.shape H = grid_size[0] // (2 ** i) W = grid_size[1] // (2 ** i) if H * W != L: raise ValueError( f"无法将形状 [B, L, C] = [{B}, {L}, {C}] 重塑为 [B, C, H, W] = [{B}, {C}, {H}, {W}],因为 H * W != L。" ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
结果如下: image_AUROC 0.6637790203094482
image_F1Score 0.6151961088180542
pixel_AUPRO 0.5146437287330627
pixel_AUROC 0.8934715986251831
pixel_F1Score 0.34470832347869873
下一步怎么改进呢
下面给你一份**“可直接替换”的完整 torch_model.py(基于你当前“最好结果”的版本做两类关键改动**,都是围绕你现在的瓶颈:** pixel_F1 偏低 / AUPRO 上不去**):
target_size = fused_features[-1].shape[2:],并且 global path 的循环也用 fused_features)s = tanh(s) * 2.0(类似于 flow 里 affine_clamping 的思想),通常会让训练稳定很多,pixel 指标更容易上去。python"""FastFlow Torch Model Implementation. Original Code Copyright (c) 2022 @gathierry https://github.com/gathierry/FastFlow/. SPDX-License-Identifier: Apache-2.0 Modified Copyright (C) 2022 Intel Corporation SPDX-License-Identifier: Apache-2.0 """ from __future__ import annotations from typing import Callable, List, Tuple, Union import timm import torch import torch.nn.functional as F from FrEIA.framework import SequenceINN from timm.models.cait import Cait from timm.models.swin_transformer_v2 import SwinTransformerV2 from timm.models.vision_transformer import VisionTransformer from torch import Tensor, nn from anomalib.models.components.flow import AllInOneBlock from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable: """Subnet Convolutional Function.""" def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv def create_fast_flow_block( input_dimensions: list[int], conv3x3_only: bool, hidden_ratio: float, flow_steps: int, clamp: float = 2.0, ) -> SequenceINN: """Create NF Fast Flow Block.""" nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): kernel_size = 1 if (i % 2 == 1 and not conv3x3_only) else 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes class FastflowModel(nn.Module): """FastFlow.""" def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size # 这两个参数你目前版本里已经在用:会参与训练(影响 flow 输入) self.aux_gates = nn.Parameter(torch.zeros(3)) # sigmoid后初始=0.5 self.global_scale_logits = nn.Parameter(torch.zeros(3)) # softmax权重 # ---------------- Backbone ---------------- if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] self.norms = nn.ModuleList() # placeholder elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in ( "swinv2_small_window8_256", "swinv2_small_window16_256", "swinv2_tiny_window8_256", "swinv2_tiny_window16_256", "swinv2_base_window8_256", "swinv2_base_window16_256", ): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [96, 192, 384] scales = [4, 8, 16] self.channels = channels self.scales = scales # 这里我建议 elementwise_affine=True(可训练),更容易适应你“扩充后数据集”的分布漂移 self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. " "Available: [cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2, swinv2_*]" ) # 冻结 backbone for p in self.feature_extractor.parameters(): p.requires_grad = False # ---------------- Aux branch (ResNet18) ---------------- self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for p in self.aux_extractor.parameters(): p.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList([nn.Conv2d(resnet_channels[i], channels[i], 1) for i in range(3)]) # ✅ 新增:对齐后的 aux 特征做一个轻量归一化,稳定融合(会参与训练) self.aux_post_norms = nn.ModuleList([nn.GroupNorm(1, channels[i]) for i in range(3)]) # ---------------- Local flow blocks ---------------- self.fast_flow_blocks = nn.ModuleList() for i in range(3): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[ channels[i], int(input_size[0] / scales[i]), int(input_size[1] / scales[i]), ], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) # ---------------- Global path ---------------- fused_ch_global = 96 self.align_convs_global = nn.ModuleList([nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True) for ch in channels]) self.global_post_norms = nn.ModuleList([nn.GroupNorm(1, fused_ch_global) for _ in channels]) H_global = input_size[0] // scales[-1] W_global = input_size[1] // scales[-1] # ✅ 建议:global 分支 steps 稍微深一点更稳(你之前扫过 16/20 有时对 pixel 有利) global_steps = max(16, flow_steps) self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, H_global, W_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=global_steps, ) # 手写 coupling 的 subnet(保持你原结构) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) # tanh-clamp 的强度(可调:1.5~3.0),默认 2.0 通常比较稳 self.manual_coupling_clamp = 2.0 def forward(self, image: torch.Tensor) -> Union[torch.Tensor, Tuple[List[torch.Tensor], List[torch.Tensor]]]: """Forward pass.""" # -------------- backbone features -------------- self.feature_extractor.eval() if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self._get_cnn_features(image) # -------------- aux features -------------- self.aux_extractor.eval() with torch.no_grad(): aux_feats = self.aux_extractor(image) # -------------- Local Path:融合 + flow -------------- fused_features: List[Tensor] = [] local_hidden_variables: List[Tensor] = [] local_log_jacobians: List[Tensor] = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate( aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False, ) # ✅ 新增:aux aligned 后做归一化 aux_feat_aligned = self.aux_post_norms[i](aux_feat_aligned) gate = torch.sigmoid(self.aux_gates[i]) # 0~1 fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) # -------------- Global Path(用 fused_features,不再用 features) -------------- aligned_features: List[Tensor] = [] target_size = fused_features[-1].shape[2:] # ✅ 关键:全局对齐用 fused_features 的空间尺寸 for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) feat = self.global_post_norms[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = w[0] * aligned_features[0] + w[1] * aligned_features[1] + w[2] * aligned_features[2] C = weighted_feats.size(1) A, B = torch.split(weighted_feats, C // 2, dim=1) st = self.subnet(A) s, t = torch.chunk(st, 2, dim=1) # ✅ 关键改进:手写 coupling 的 s 做 tanh-clamp,避免 exp 爆掉导致训练不稳/泛化差 s = torch.tanh(s) * self.manual_coupling_clamp B_transformed = torch.exp(s) * B + t conditioned_feature = torch.cat([A, B_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) # -------------- Combine anomaly maps(保持你验证过的 0.9/0.1) -------------- anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map hidden_variables = local_hidden_variables + [global_hidden_variable] log_jacobians = local_log_jacobians + [global_log_jacobian] return_val = (hidden_variables, log_jacobians) if not self.training: return_val = anomaly_map return return_val # ---------------- feature helpers ---------------- def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): features = self.feature_extractor(input_tensor) # norms 可能可训练,所以不放在 no_grad 里 features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) return [feature] def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: with torch.no_grad(): feats = self.feature_extractor.forward_features(input_tensor) if not isinstance(feats, (list, tuple)): raise ValueError("Expected Swin forward_features to return a list/tuple of stage features.") feats = list(feats)[:3] reshaped_features: List[Tensor] = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(feats): B, L, C = feat.shape H = grid_size[0] // (2**i) W = grid_size[1] // (2**i) if H * W != L: raise ValueError( f"Cannot reshape [B, L, C]=[{B},{L},{C}] to [B,C,H,W]=[{B},{C},{H},{W}] because H*W!=L." ) feat = feat.permute(0, 2, 1).reshape(B, C, H, W) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
你目前是:
这说明问题不是“能不能找出来异常”,而是异常区域的边界/面积预测不够准(阈值+热图形态+小目标一致性)。
你现在 early_stopping 用的是 image_AUROC,它会逼着模型“判别一张图是否异常”,而不是“像素区域分得准”。
建议你先改 config:
early_stopping.metric: pixel_AUPRO(优先)pixel_F1Score很多时候这一步就能让 pixel_F1 明显涨一截(因为你训练过程选到的 checkpoint 完全不同)。
你之前扫过 global_steps=16/20,现代码我默认 global_steps=max(16, flow_steps)。
建议你做两次对比(只动 global_steps):
如果 pixel_F1 仍低,说明 global map 对像素边界帮助不大,后面就应该减弱 global 分支(比如 0.95/0.05)。
你现在是 swinv2_tiny_window16_256,window16 对小障碍物往往不友好。更“稳妥”的改法是:
swinv2_tiny_window8_256(通常对小目标更敏感)你现在 threshold 是 adaptive。pixel_F1 很可能是阈值没对上。
建议你做一个“上限实验”:
manual_pixel(比如 0.05~0.95)如果你愿意把你当前验证集上 anomaly_map 的几张可视化(正常/异常各几张 + mask)贴出来,我可以更快判断:
到底是 热图形态问题(需要改 map 生成/融合),还是 阈值/标注尺寸对齐问题(mask resize/crop)。这两种改法完全不同。
按照你的代码,我的配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window8_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]
结果如下: image_AUROC 0.6359753608703613
image_F1Score 0.590243935585022
pixel_AUPRO 0.3344220221042633
pixel_AUROC 0.7628674507141113
pixel_F1Score 0.043394044041633606
配置文件如下:model:
name: fastflow
backbone: swinv2_tiny_window16_256 # options: [resnet18, ,swinv2_tiny_window16_256,wide_resnet50_2, cait_m48_448, deit_base_distilled_patch16_384]
pre_trained: true
flow_steps: 12 # options: [8, 8, 20, 20] - for each supported backbone
hidden_ratio: 1.0 # options: [1.0, 1.0, 0.16, 0.16] - for each supported backbone
conv3x3_only: True # options: [True, False, False, False] - for each supported backbone
lr: 0.001
weight_decay: 0.00001
early_stopping:
patience: 10
metric: image_AUROC
mode: max
normalization_method: min_max # options: [null, min_max, cdf]结果如下:image_AUROC 0.6401253938674927
image_F1Score 0.6085918545722961
pixel_AUPRO 0.41742947697639465
pixel_AUROC 0.8104718923568726
pixel_F1Score 0.03814755380153656
你这两组结果(window8 / window16)里 pixel_F1 都掉到 ~0.04,但 pixel_AUROC 还有 0.76~0.81——这几乎可以判定:** 主要不是“热图没信息”,而是“阈值/归一化/选 checkpoint 的目标”把像素二值化搞崩了**。也就是说:你现在的模型在像素排序上还行,但被阈值策略/早停策略选到的权重,导致预测 mask 几乎全空或全噪声,从而 F1 接近 0。
下面给你一套下一步最稳、最可能立刻涨分的改法(按优先级从高到低),每一步都尽量“只动一点点”。
你现在 early stopping 盯着 image_AUROC,很容易选到“图像判别不错但像素阈值很差”的 checkpoint,于是 pixel_F1 直接崩。
把配置改成:
yamlearly_stopping: patience: 10 metric: pixel_AUPRO mode: max
如果你只能选一个改动先试,就先做这个。很多项目里这一步就能让 pixel_F1 从 0.0x 回到 0.2~0.3+。
min_max 在异常检测里很容易把少量极端值拉伸/压扁,导致“阈值=0.5”这种默认策略完全不合适,像素 F1 就会非常难看。
把:
yamlnormalization_method: min_max
改成:
yamlnormalization_method: cdf
(或者先设成 null 做对照,看是否是归一化导致阈值崩。)
从你给的数看,window8 比 window16 整体更差,而且 pixel F1 同样崩。下一轮主线就用:
swinv2_tiny_window16_256window8 想对小目标友好,需要配合更细的后处理/阈值策略,不然噪声会更重。
你贴的“最好结果那版”其实结构最对路。下一步代码上我建议只做 两处改动,不会把结构改乱:
你 local 里算了 fused_feature = features[i] + gate * aux_feat_aligned,但 global 用的还是 features。把它改成用 fused_features,让两条分支输入分布一致,通常对 pixel AUPRO/F1 有帮助。
最小改法(只改 forward):
pythonfused_features = [] ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) ... # global: target_size = fused_features[-1].shape[2:] for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat)
你 global coupling 里 exp(s) 不受控,训练一不稳就会出现“热图要么全平,要么全爆点”,像素阈值就会崩。
最保守、最不影响你原逻辑的补丁:
pythons = torch.clamp(s, -3.0, 3.0) B_transformed = torch.exp(s) * B + t
这一步通常会让 pixel 指标更稳定(尤其是 F1)。
你现在:
yamlconv3x3_only: True
建议改成:
yamlconv3x3_only: False
让 flow block 交替出现 1×1/3×3,很多场景下对复杂背景的泛化更好(尤其你数据集扩充后)。
这个不改训练,只验证问题根因:
如果“扫阈值后的最好 pixel_F1 明显 > 当前 0.04”,那就说明模型热图是好的,只是默认阈值策略不适配。后面你就应该把“阈值学习/选择”改成基于像素而不是图像。
用 window16 做主线,先把训练选 checkpoint 的目标改对:
yamlmodel: backbone: swinv2_tiny_window16_256 flow_steps: 12 hidden_ratio: 1.0 conv3x3_only: False lr: 0.001 weight_decay: 0.00001 early_stopping: patience: 10 metric: pixel_AUPRO mode: max normalization_method: cdf
再配合我上面 4.1 + 4.2 的两行代码修正(global 用 fused_features + clamp s)。
如果你按这套跑完,把新的 5 个指标贴我(尤其 pixel_AUPRO / pixel_F1),我下一步会根据“pixel_AUPRO 上去了但 F1 还低”还是“AUROC 上去了但 AUPRO 低”来给你只动一两处的精准改法(比如只调 global_steps / 只调融合权重 0.95/0.05 / 或只改 anomaly_map 的能量形式)。
我现在按照下面的代码:"""FastFlow Anomaly Map Generator Implementation."""
from future import annotations
import torch
import torch.nn.functional as F
from omegaconf import ListConfig
from torch import Tensor, nn
class AnomalyMapGenerator(nn.Module):
"""Generate Anomaly Heatmap."""
textdef __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: """Generate Anomaly Heatmap. This implementation generates the heatmap based on the flow maps computed from the normalizing flow (NF) FastFlow blocks. Each block yields a flow map, which overall is stacked and averaged to an anomaly map. Args: hidden_variables (list[Tensor]): List of hidden variables from each NF FastFlow block. Returns: Tensor: Anomaly Map. """ flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: log_prob = -0.5 * torch.mean(hidden_variable**2, dim=1, keepdim=True) prob = torch.exp(log_prob) flow_map = F.interpolate( input=-prob, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) anomaly_map = torch.mean(flow_maps, dim=-1) return anomaly_map
另一个文件如下:"""FastFlow Torch Model Implementation."""
from future import annotations
from typing import Callable
import timm
import torch
import torch.nn.functional as F
from FrEIA.framework import SequenceINN
from timm.models.cait import Cait
from timm.models.swin_transformer_v2 import SwinTransformerV2
from timm.models.vision_transformer import VisionTransformer
from torch import Tensor, nn
from anomalib.models.components.flow import AllInOneBlock
from anomalib.models.fastflow.anomaly_map import AnomalyMapGenerator
SWINV2_BACKBONES = (
"swinv2_small_window8_256",
"swinv2_small_window16_256",
"swinv2_tiny_window8_256",
"swinv2_tiny_window16_256",
"swinv2_base_window8_256",
"swinv2_base_window16_256",
)
def subnet_conv_func(kernel_size: int, hidden_ratio: float) -> Callable:
"""Subnet Convolutional Function.
textCallable class or function ``f``, called as ``f(channels_in, channels_out)`` and should return a torch.nn.Module. Predicts coupling coefficients :math:`s, t`. Args: kernel_size (int): Kernel Size hidden_ratio (float): Hidden ratio to compute number of hidden channels. Returns: Callable: Sequential for the subnet constructor. """ def subnet_conv(in_channels: int, out_channels: int) -> nn.Sequential: hidden_channels = int(in_channels * hidden_ratio) # NOTE: setting padding="same" in nn.Conv2d breaks the onnx export so manual padding required. # TODO: Use padding="same" in nn.Conv2d once PyTorch v2.1 is released padding = 2 * (kernel_size // 2 - ((1 + kernel_size) % 2), kernel_size // 2) return nn.Sequential( nn.ZeroPad2d(padding), nn.Conv2d(in_channels, hidden_channels, kernel_size), nn.LeakyReLU(), nn.ZeroPad2d(padding), nn.Conv2d(hidden_channels, out_channels, kernel_size), ) return subnet_conv
def create_fast_flow_block(
input_dimensions: list[int],
conv3x3_only: bool,
hidden_ratio: float,
flow_steps: int,
clamp: float = 2.0,
) -> SequenceINN:
"""Create NF Fast Flow Block.
textThis is to create Normalizing Flow (NF) Fast Flow model block based on Figure 2 and Section 3.3 in the paper. Args: input_dimensions (list[int]): Input dimensions (Channel, Height, Width) conv3x3_only (bool): Boolean whether to use conv3x3 only or conv3x3 and conv1x1. hidden_ratio (float): Ratio for the hidden layer channels. flow_steps (int): Flow steps. clamp (float, optional): Clamp. Defaults to 2.0. Returns: SequenceINN: FastFlow Block. """ nodes = SequenceINN(*input_dimensions) for i in range(flow_steps): if i % 2 == 1 and not conv3x3_only: kernel_size = 1 else: kernel_size = 3 nodes.append( AllInOneBlock, subnet_constructor=subnet_conv_func(kernel_size, hidden_ratio), affine_clamping=clamp, permute_soft=False, ) return nodes
class FastflowModel(nn.Module):
"""FastFlow.
textUnsupervised Anomaly Detection and Localization via 2D Normalizing Flows. Args: input_size (tuple[int, int]): Model input size. backbone (str): Backbone CNN network pre_trained (bool, optional): Boolean to check whether to use a pre_trained backbone. flow_steps (int, optional): Flow steps. conv3x3_only (bool, optinoal): Use only conv3x3 in fast_flow model. Defaults to False. hidden_ratio (float, optional): Ratio to calculate hidden var channels. Defaults to 1.0. Raises: ValueError: When the backbone is not supported. """ def __init__( self, input_size: tuple[int, int], backbone: str, pre_trained: bool = True, flow_steps: int = 8, conv3x3_only: bool = False, hidden_ratio: float = 1.0, ) -> None: super().__init__() self.input_size = input_size self.use_aux_branch = False if backbone in ("cait_m48_448", "deit_base_distilled_patch16_384"): self.feature_extractor = timm.create_model(backbone, pretrained=pre_trained) channels = [768] scales = [16] elif backbone in ("resnet18", "wide_resnet50_2"): self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, features_only=True, out_indices=[1, 2, 3], ) channels = self.feature_extractor.feature_info.channels() scales = self.feature_extractor.feature_info.reduction() # for transformers, use their pretrained norm w/o grad # for resnets, self.norms are trainable LayerNorm self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=True, ) ) elif backbone in SWINV2_BACKBONES: self.feature_extractor = timm.create_model( backbone, pretrained=pre_trained, ) channels = [96, 192, 384] scales = [4, 8, 16] self.use_aux_branch = True self.norms = nn.ModuleList() for channel, scale in zip(channels, scales): self.norms.append( nn.LayerNorm( [channel, int(input_size[0] / scale), int(input_size[1] / scale)], elementwise_affine=False, ) ) else: raise ValueError( f"Backbone {backbone} is not supported. List of available backbones are " "[cait_m48_448, deit_base_distilled_patch16_384, resnet18, wide_resnet50_2]." ) self.channels = channels self.scales = scales for parameter in self.feature_extractor.parameters(): parameter.requires_grad = False self.fast_flow_blocks = nn.ModuleList() for channel, scale in zip(channels, scales): self.fast_flow_blocks.append( create_fast_flow_block( input_dimensions=[channel, int(input_size[0] / scale), int(input_size[1] / scale)], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) ) if self.use_aux_branch: self.aux_extractor = timm.create_model( "resnet18", pretrained=True, features_only=True, out_indices=[1, 2, 3], ) for param in self.aux_extractor.parameters(): param.requires_grad = False resnet_channels = [64, 128, 256] self.resnet_align = nn.ModuleList( [nn.Conv2d(resnet_channels[i], self.channels[i], 1) for i in range(3)] ) self.aux_norms = nn.ModuleList([nn.GroupNorm(8, ch) for ch in self.channels]) self.aux_gates = nn.Parameter(torch.full((3,), -3.5)) self.align_convs_global = nn.ModuleList() fused_ch_global = 96 for ch in channels: self.align_convs_global.append(nn.Conv2d(ch, fused_ch_global, kernel_size=1, bias=True)) concatenated_channel = fused_ch_global h_global = input_size[0] // scales[-1] w_global = input_size[1] // scales[-1] self.global_flow_block = create_fast_flow_block( input_dimensions=[fused_ch_global, h_global, w_global], conv3x3_only=conv3x3_only, hidden_ratio=hidden_ratio, flow_steps=flow_steps, ) self.global_scale_logits = nn.Parameter(torch.zeros(len(channels))) self.channel_weighting_module = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(concatenated_channel, concatenated_channel // 4, 1), nn.ReLU(), nn.Conv2d(concatenated_channel // 4, len(channels), 1), nn.Softmax(dim=1), ) self.subnet = nn.Sequential( nn.Conv2d(fused_ch_global // 2, fused_ch_global // 2, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(fused_ch_global // 2, fused_ch_global, kernel_size=3, padding=1), ) self.anomaly_map_generator = AnomalyMapGenerator(input_size=input_size) def forward(self, image: Tensor) -> Tensor | list[Tensor] | tuple[list[Tensor]]: """Forward-Pass for FastFlow Model.""" return_val: Tensor | list[Tensor] | tuple[list[Tensor]] self.feature_extractor.eval() if isinstance(self.feature_extractor, VisionTransformer): features = self._get_vit_features(image) elif isinstance(self.feature_extractor, Cait): features = self._get_cait_features(image) elif isinstance(self.feature_extractor, SwinTransformerV2): features = self._get_swint_features(image) else: features = self.feature_extractor(image) features = [self.norms[i](feat) for i, feat in enumerate(features)] if not self.use_aux_branch: hidden_variables: list[Tensor] = [] log_jacobians: list[Tensor] = [] for fast_flow_block, feature in zip(self.fast_flow_blocks, features): hidden_variable, log_jacobian = fast_flow_block(feature) hidden_variables.append(hidden_variable) log_jacobians.append(log_jacobian) return_val = (hidden_variables, log_jacobians) if not self.training: return_val = self.anomaly_map_generator(hidden_variables) return return_val self.aux_extractor.eval() aux_feats = self.aux_extractor(image) local_hidden_variables = [] local_log_jacobians = [] for i in range(len(features)): aux_feat_aligned = self.resnet_align[i](aux_feats[i]) if aux_feat_aligned.shape[-2:] != features[i].shape[-2:]: aux_feat_aligned = F.interpolate( aux_feat_aligned, size=features[i].shape[-2:], mode="bilinear", align_corners=False, ) aux_feat_aligned = self.aux_norms[i](aux_feat_aligned) gate = torch.sigmoid(self.aux_gates[i]) fused_feature = features[i] + gate * aux_feat_aligned hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) local_hidden_variables.append(hidden_variable) local_log_jacobians.append(log_jacobian) local_anomaly_map = self.anomaly_map_generator(local_hidden_variables) aligned_features = [] target_size = features[-1].shape[2:] for i, feat in enumerate(features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat) w = torch.softmax(self.global_scale_logits, dim=0) weighted_feats = aligned_features[0] * w[0] for i in range(1, len(aligned_features)): weighted_feats = weighted_feats + aligned_features[i] * w[i] c = weighted_feats.size(1) a, b = torch.split(weighted_feats, c // 2, dim=1) st = self.subnet(a) s, t = torch.chunk(st, 2, dim=1) b_transformed = torch.exp(s) * b + t conditioned_feature = torch.cat([a, b_transformed], dim=1) global_hidden_variable, global_log_jacobian = self.global_flow_block(conditioned_feature) global_anomaly_map = self.anomaly_map_generator([global_hidden_variable]) anomaly_map = 0.9 * local_anomaly_map + 0.1 * global_anomaly_map hidden_variables: list[Tensor] = local_hidden_variables + [global_hidden_variable] log_jacobians: list[Tensor] = local_log_jacobians + [global_log_jacobian] return_val = (hidden_variables, log_jacobians) if not self.training: return_val = anomaly_map return return_val def _get_cnn_features(self, input_tensor: Tensor) -> list[Tensor]: """Get CNN-based features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ features = self.feature_extractor(input_tensor) features = [self.norms[i](feature) for i, feature in enumerate(features)] return features def _get_cait_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Class-Attention-Image-Transformers (CaiT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) feature = feature + self.feature_extractor.pos_embed feature = self.feature_extractor.pos_drop(feature) for i in range(41): # paper Table 6. Block Index = 40 feature = self.feature_extractor.blocks[i](feature) batch_size, _, num_channels = feature.shape feature = self.feature_extractor.norm(feature) feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_vit_features(self, input_tensor: Tensor) -> list[Tensor]: """Get Vision Transformers (ViT) features. Args: input_tensor (Tensor): Input Tensor. Returns: list[Tensor]: List of features. """ feature = self.feature_extractor.patch_embed(input_tensor) cls_token = self.feature_extractor.cls_token.expand(feature.shape[0], -1, -1) if self.feature_extractor.dist_token is None: feature = torch.cat((cls_token, feature), dim=1) else: feature = torch.cat( ( cls_token, self.feature_extractor.dist_token.expand(feature.shape[0], -1, -1), feature, ), dim=1, ) feature = self.feature_extractor.pos_drop(feature + self.feature_extractor.pos_embed) for i in range(8): # paper Table 6. Block Index = 7 feature = self.feature_extractor.blocks[i](feature) feature = self.feature_extractor.norm(feature) feature = feature[:, 2:, :] batch_size, _, num_channels = feature.shape feature = feature.permute(0, 2, 1) feature = feature.reshape(batch_size, num_channels, self.input_size[0] // 16, self.input_size[1] // 16) features = [feature] return features def _get_swint_features(self, input_tensor: Tensor) -> list[Tensor]: features = self.feature_extractor.forward_features(input_tensor) features = features[:3] reshaped_features = [] grid_size = self.feature_extractor.patch_embed.grid_size for i, feat in enumerate(features): b, l, c = feat.shape h = grid_size[0] // (2**i) w = grid_size[1] // (2**i) if h * w != l: raise ValueError( f"Cannot reshape [B, L, C]=[{b}, {l}, {c}] into [B, C, H, W]=[{b}, {c}, {h}, {w}]." ) feat = feat.permute(0, 2, 1).reshape(b, c, h, w) feat = self.norms[i](feat) reshaped_features.append(feat) return reshaped_features
代码结果很好,输入是256,结果如下:运行后结果如下: image_AUROC 0.6637790203094482
image_F1Score 0.6151961088180542
pixel_AUPRO 0.513035237789154
pixel_AUROC 0.8913142681121826
pixel_F1Score 0.3465515673160553,还能怎么改进呢
你现在这版结果(image_AUROC≈0.664、pixel_AUPRO≈0.513、pixel_F1≈0.347)已经到了一个“热图有信息,但还没把对比度/阈值/边界形态吃满”的阶段。接下来最划算的提升,一般不在再堆新分支,而在:
下面给你按“改动小、收益大”的顺序,给出具体怎么改(含可直接替换的代码)。
exp(log_prob),改成能量项(强烈建议)你当前:
pythonlog_prob = -0.5 * mean(z^2) prob = exp(log_prob) flow_map = -prob
这会有个典型问题:exp 会很快饱和,导致异常强弱对比被压扁,像素阈值很难选,AUPRO/F1 往往上不去。
更稳的做法是直接用 能量(≈ -log p):
score = 0.5 * mean(z^2),它在异常区域会“线性更分得开”。
✅ 直接把你的 anomaly_map.py 替换成下面这个版本:
python"""FastFlow Anomaly Map Generator Implementation.""" from __future__ import annotations import torch import torch.nn.functional as F from omegaconf import ListConfig from torch import Tensor, nn class AnomalyMapGenerator(nn.Module): """Generate Anomaly Heatmap.""" def __init__(self, input_size: ListConfig | tuple) -> None: super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps: list[Tensor] = [] for hidden_variable in hidden_variables: # energy ~= -log p, avoids exp saturation score = 0.5 * torch.mean(hidden_variable ** 2, dim=1, keepdim=True) # [B,1,h,w] flow_map = F.interpolate( input=score, size=self.input_size, mode="bilinear", align_corners=False, ) flow_maps.append(flow_map) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] anomaly_map = torch.mean(flow_maps, dim=-1) # [B,1,H,W] # optional: a tiny smoothing often helps pixel-F1 (keep very mild) anomaly_map = F.avg_pool2d(anomaly_map, kernel_size=3, stride=1, padding=1) return anomaly_map
这一步经常会让 pixel_AUPRO / pixel_F1 直接更好(尤其是你这种“热图排序不错但阈值难”的情况)。
你现在 global 路径还在用 features 做 aligned_features:
pythontarget_size = features[-1].shape[2:] for i, feat in enumerate(features): ...
但 local 已经算了 fused_feature = features[i] + gate * aux_feat_aligned。global 和 local 输入分布不一致,融合时会互相拉扯,最伤 pixel 指标。
把 local 部分改成:
pythonfused_features = [] ... fused_feature = features[i] + gate * aux_feat_aligned fused_features.append(fused_feature) hidden_variable, log_jacobian = self.fast_flow_blocks[i](fused_feature) ...
把 global 部分改成:
pythonaligned_features = [] target_size = fused_features[-1].shape[2:] # ✅ 用 fused_features for i, feat in enumerate(fused_features): feat = self.align_convs_global[i](feat) down_feat = F.interpolate(feat, size=target_size, mode="bilinear", align_corners=False) aligned_features.append(down_feat)
你现在:
pythonb_transformed = torch.exp(s) * b + t
改成(很保守的 clamp,不会改变太多表达能力,但能显著减少爆点):
pythons = torch.clamp(s, -3.0, 3.0) b_transformed = torch.exp(s) * b + t
你之前已经出现过:pixel_AUROC 不低但 pixel_F1 崩的情况,这通常就是 early stopping 选错 checkpoint。
建议你把:
yamlearly_stopping: metric: image_AUROC
改成:
yamlearly_stopping: metric: pixel_AUPRO mode: max
你现在要对比其他算法,通常比的是定位能力(AUPRO/F1),这一步很关键。
你现在固定融合 0.9/0.1。建议你只扫下面这 4 个组合(别大范围扫):
w_global = 0.05 和 0.10global_flow_steps = 16 和 20(global 单独设,不要跟 local 一样 12)组合:
经验上:pixel F1 低时,往往 global 权重需要更小(0.05~0.1),否则 global 的低分辨率响应会把边界“抹糊”。
你现在用 min_max,它对极端值很敏感,像素阈值会飘。
建议你对照跑一次:
yamlnormalization_method: cdf
你跑完后把新的 5 个指标贴我(尤其 pixel_AUPRO / pixel_F1),我再根据“AUPRO涨了但F1没涨”还是“F1涨了但image AUROC掉了”给你下一步只改 1~2 行的精准策略(比如是否要改融合方式、是否要做 top-quantile 的 image score 等)。
除了cdf替换没进行外,我的代码结果如下: image_AUROC 0.5986433029174805
image_F1Score 0.5886363387107849
pixel_AUPRO 0.4152340590953827
pixel_AUROC 0.8632453083992004
pixel_F1Score 0.3275681734085083,下一步怎么改进算法呢
你这次改完(energy anomaly map + fused_features + clamp(s))后,pixel_AUROC 还在 0.86,说明“热图仍然有区分度”,但 pixel_AUPRO / image_AUROC 明显掉,典型原因是:
-exp(log_prob) 更“长尾”),而你还在用 min_max 归一化 + 原来的阈值/早停策略 → 阈值和排序的“有效区间”被破坏;下面给你下一步最建议的改进路线(按“最可能涨分 / 改动最小”排序),每一步都很具体。
normalization_method 换成 cdf(你还没做,但这是这次掉分的最可能元凶)你现在的 energy map 很容易出现长尾,min_max 会被极值牵着走,导致大多数像素被挤到很窄范围里,AUPRO/F1 都会受伤。
直接改配置:
yamlnormalization_method: cdf
这一步非常可能把你这次掉下去的 image_AUROC / pixel_AUPRO 拉回来,甚至超过你 0.513 那次。
你已经多次出现“image 还行但 pixel 很飘”的现象。继续用 image_AUROC 早停,会反复选到像素不佳的 checkpoint。
改成:
yamlearly_stopping: patience: 10 metric: pixel_AUPRO mode: max
(如果你更看重最终 pixel_F1,也可以改成 pixel_F1Score。)
你现在 energy = 0.5*mean(z^2) 太“长尾”,AUPRO 往往吃亏。解决方式不是回退到 exp,而是:
log1p 压缩长尾(保留排序,但让阈值更好找)把你的 AnomalyMapGenerator 改成这个版本(只改一个文件):
pythonclass AnomalyMapGenerator(nn.Module): def __init__(self, input_size): super().__init__() self.input_size = input_size if isinstance(input_size, tuple) else tuple(input_size) def forward(self, hidden_variables: list[Tensor]) -> Tensor: flow_maps = [] for z in hidden_variables: # energy score = 0.5 * torch.mean(z ** 2, dim=1, keepdim=True) # [B,1,h,w] score = torch.log1p(score) # ✅ 压缩长尾 score = F.interpolate(score, size=self.input_size, mode="bilinear", align_corners=False) # ✅ 稳健归一化:按每张图的分位数缩放,避免被极端值牵着走 B = score.shape[0] flat = score.view(B, -1) q_low = torch.quantile(flat, 0.01, dim=1).view(B, 1, 1, 1) q_high = torch.quantile(flat, 0.99, dim=1).view(B, 1, 1, 1) score = (score - q_low) / (q_high - q_low + 1e-6) score = score.clamp(0.0, 1.0) flow_maps.append(score) flow_maps = torch.stack(flow_maps, dim=-1) # [B,1,H,W,N] anomaly_map = torch.mean(flow_maps, dim=-1) # [B,1,H,W] anomaly_map = F.avg_pool2d(anomaly_map, 3, 1, 1) # 轻微平滑 return anomaly_map
这招通常对 pixel_AUPRO 很友好,因为它直接改善“PR 曲线的可分性”,也会让 pixel_F1 更稳定。
你原来最好的像素结果是基于旧热图形态。现在换了 energy 类热图,global(低分辨率)更容易把边界抹糊,AUPRO/F1 掉是常见的。
下一轮只扫 4 个点就够了(别大范围扫):
w_global: 0.05 / 0.10global_steps: 16 / 20(global 独立设,别跟 local 一样 12)组合:
经验:当你发现 AUPRO/F1 不够,先把 global 权重减小,比加复杂模块有效。
如果你现在用的是 s = clamp(s, -3, 3),它可能把 global 的表达力砍太多,导致 image_AUROC/pixel_AUROC 都掉。
两种更稳的写法(二选一):
A. 放宽 clamp:
pythons = torch.clamp(s, -5.0, 5.0)
B. tanh-clamp(更平滑):
pythons = torch.tanh(s) * 2.5 # 2.0~3.0 都可以试
normalization_method: cdfearly_stopping.metric: pixel_AUPROw_global=0.05,global_steps=16(先跑这一点)跑完把 5 个指标贴我——我下一步会根据: