SimAM注意力机制简介
Created on: December 25, 2024
Created on: December 25, 2024
simam注意力机制思想是什么?可以应用在哪里?模型哪一层效果好?
SimAM(Simple Attention Module)是一种简单且高效的注意力机制,旨在通过无参数的方法提升神经网络的表示能力。以下是关于SimAM注意力机制的详细介绍:
SimAM的核心思想受到神经科学中单个神经元行为的启发。它通过一个能量函数来评估每个神经元的重要性,从而为特征图中的每个位置分配权重。具体而言,SimAM利用如下步骤:
能量函数设计:SimAM设计了一个基于输入特征的能量函数,旨在衡量每个神经元在整体特征表示中的重要性。能量函数考虑了神经元的激活值以及其在局部区域内的分布情况。
参数无关:与许多其他注意力机制不同,SimAM不依赖于额外的参数或复杂的计算,这使其在保持轻量化的同时提升模型性能。
权重计算:通过优化能量函数,SimAM为每个神经元分配一个权重,这些权重反映了该神经元在特征表示中的重要程度。
SimAM由于其简单高效的特性,适用于多种深度学习任务,尤其在以下领域表现出色:
医学影像分割:SimAM最初是在3D医学影像分割任务中提出,证明了其在处理高维数据和复杂结构中的有效性。
计算机视觉:在图像分类、目标检测和语义分割等任务中,SimAM可以作为一个有效的注意力模块,提升模型对关键信息的关注能力。
自然语言处理:虽然SimAM主要应用于视觉任务,但其思想也可以扩展到序列数据中,用于增强模型对重要词汇或特征的关注。
SimAM通常在网络的中间层或高层应用效果最佳。具体原因包括:
中间层:在中间层引入SimAM可以帮助模型更好地整合低级特征,提升特征表示的丰富性和判别力。
高层:在高层应用SimAM,有助于模型聚焦于更抽象和语义化的特征,提高最终任务的性能。
多层级应用:有研究表明,在多个层级同时应用SimAM,可以进一步提升模型的整体表现,因为不同层级的特征在语义和空间上具有不同的重要性。
SimAM作为一种简单且无参数的注意力机制,通过能量函数有效地评估和赋权特征图中的神经元,提升了模型的表示能力和性能。其广泛适用于医学影像、计算机视觉等多个领域,并且在网络的中高层应用效果尤为显著。
我想应用于movinet模型,应用于因果卷积和流推理,保证simam思想不干扰因果卷积和流推理,保证当前推理不依赖未来帧进行训练。应该怎么做
"""Contains common building blocks for MoViNets.
Reference: https://arxiv.org/pdf/2103.11511.pdf
"""
from typing import Any, Mapping, Optional, Sequence, Tuple, Union
import tensorflow as tf, tf_keras
from official.modeling import tf_utils
from official.vision.modeling.layers import nn_layers
KERNEL_WEIGHT_DECAY = 1.5e-5
def normalize_tuple(value: Union[int, Tuple[int, ...]], size: int, name: str):
"""Transforms a single integer or iterable of integers into an integer tuple.
Arguments:
value: The value to validate and convert. Could an int, or any iterable of
ints.
size: The size of the tuple to be returned.
name: The name of the argument being validated, e.g. "strides" or
"kernel_size". This is only used to format error messages.
Returns:
A tuple of size
integers.
Raises:
ValueError: If something else than an int/long or iterable thereof was
passed.
"""
if isinstance(value, int):
return (value,) * size
else:
try:
value_tuple = tuple(value)
except TypeError:
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value))
if len(value_tuple) != size:
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value))
for single_value in value_tuple:
try:
int(single_value)
except (ValueError, TypeError):
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value) + ' '
'including element ' + str(single_value) + ' of type' +
' ' + str(type(single_value)))
return value_tuple
@tf_keras.utils.register_keras_serializable(package='Vision')
class Squeeze3D(tf_keras.layers.Layer):
"""Squeeze3D layer to remove singular dimensions."""
def call(self, inputs):
"""Calls the layer with the given inputs."""
return tf.squeeze(inputs, axis=(1, 2, 3))
@tf_keras.utils.register_keras_serializable(package='Vision')
class MobileConv2D(tf_keras.layers.Layer):
"""Conv2D layer with extra options to support mobile devices.
Reshapes 5D video tensor inputs to 4D, allowing Conv2D to run across
dimensions (2, 3) or (3, 4). Reshapes tensors back to 5D when returning the
output.
"""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = (1, 1),
padding: str = 'valid',
data_format: Optional[str] = None,
dilation_rate: Union[int, Sequence[int]] = (1, 1),
groups: int = 1,
use_bias: bool = True,
kernel_initializer: str = 'glorot_uniform',
bias_initializer: str = 'zeros',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
bias_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
activity_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
kernel_constraint: Optional[tf_keras.constraints.Constraint] = None,
bias_constraint: Optional[tf_keras.constraints.Constraint] = None,
use_depthwise: bool = False,
use_temporal: bool = False,
use_buffered_input: bool = False, # pytype: disable=annotation-type-mismatch # typed-keras
batch_norm_op: Optional[Any] = None,
activation_op: Optional[Any] = None,
**kwargs): # pylint: disable=g-doc-args
"""Initializes mobile conv2d.
textFor the majority of arguments, see tf_keras.layers.Conv2D. Args: use_depthwise: if True, use DepthwiseConv2D instead of Conv2D use_temporal: if True, apply Conv2D starting from the temporal dimension instead of the spatial dimensions. use_buffered_input: if True, the input is expected to be padded beforehand. In effect, calling this layer will use 'valid' padding on the temporal dimension to simulate 'causal' padding. batch_norm_op: A callable object of batch norm layer. If None, no batch norm will be applied after the convolution. activation_op: A callabel object of activation layer. If None, no activation will be applied after the convolution. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the MobileConv2D operation. """ super(MobileConv2D, self).__init__(**kwargs) self._filters = filters self._kernel_size = kernel_size self._strides = strides self._padding = padding self._data_format = data_format self._dilation_rate = dilation_rate self._groups = groups self._use_bias = use_bias self._kernel_initializer = kernel_initializer self._bias_initializer = bias_initializer self._kernel_regularizer = kernel_regularizer self._bias_regularizer = bias_regularizer self._activity_regularizer = activity_regularizer self._kernel_constraint = kernel_constraint self._bias_constraint = bias_constraint self._use_depthwise = use_depthwise self._use_temporal = use_temporal self._use_buffered_input = use_buffered_input self._batch_norm_op = batch_norm_op self._activation_op = activation_op kernel_size = normalize_tuple(kernel_size, 2, 'kernel_size') if self._use_temporal and kernel_size[1] > 1: raise ValueError('Temporal conv with spatial kernel is not supported.') if use_depthwise: self._conv = nn_layers.DepthwiseConv2D( kernel_size=kernel_size, strides=strides, padding=padding, depth_multiplier=1, data_format=data_format, dilation_rate=dilation_rate, use_bias=use_bias, depthwise_initializer=kernel_initializer, bias_initializer=bias_initializer, depthwise_regularizer=kernel_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, depthwise_constraint=kernel_constraint, bias_constraint=bias_constraint, use_buffered_input=use_buffered_input) else: self._conv = nn_layers.Conv2D( filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, data_format=data_format, dilation_rate=dilation_rate, groups=groups, use_bias=use_bias, kernel_initializer=kernel_initializer, bias_initializer=bias_initializer, kernel_regularizer=kernel_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, kernel_constraint=kernel_constraint, bias_constraint=bias_constraint, use_buffered_input=use_buffered_input)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'filters': self._filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'padding': self._padding,
'data_format': self._data_format,
'dilation_rate': self._dilation_rate,
'groups': self._groups,
'use_bias': self._use_bias,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'activity_regularizer': self._activity_regularizer,
'kernel_constraint': self._kernel_constraint,
'bias_constraint': self._bias_constraint,
'use_depthwise': self._use_depthwise,
'use_temporal': self._use_temporal,
'use_buffered_input': self._use_buffered_input,
}
base_config = super(MobileConv2D, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs):
"""Calls the layer with the given inputs."""
if self._use_temporal:
input_shape = [
tf.shape(inputs)[0],
tf.shape(inputs)[1],
tf.shape(inputs)[2] * tf.shape(inputs)[3],
inputs.shape[4]]
else:
input_shape = [
tf.shape(inputs)[0] * tf.shape(inputs)[1],
tf.shape(inputs)[2],
tf.shape(inputs)[3],
inputs.shape[4]]
x = tf.reshape(inputs, input_shape)
textx = self._conv(x) if self._batch_norm_op is not None: x = self._batch_norm_op(x) if self._activation_op is not None: x = self._activation_op(x) if self._use_temporal: output_shape = [ tf.shape(x)[0], tf.shape(x)[1], tf.shape(inputs)[2], tf.shape(inputs)[3], x.shape[3]] else: output_shape = [ tf.shape(inputs)[0], tf.shape(inputs)[1], tf.shape(x)[1], tf.shape(x)[2], x.shape[3]] x = tf.reshape(x, output_shape) return x
@tf_keras.utils.register_keras_serializable(package='Vision')
class ConvBlock(tf_keras.layers.Layer):
"""A Conv followed by optional BatchNorm and Activation."""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = 1,
depthwise: bool = False,
causal: bool = False,
use_bias: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] =
tf_keras.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_batch_norm: bool = True,
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
activation: Optional[Any] = None,
conv_type: str = '3d',
use_buffered_input: bool = False, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Initializes a conv block.
textArgs: filters: filters for the conv operation. kernel_size: kernel size for the conv operation. strides: strides for the conv operation. depthwise: if True, use DepthwiseConv2D instead of Conv2D causal: if True, use causal mode for the conv operation. use_bias: use bias for the conv operation. kernel_initializer: kernel initializer for the conv operation. kernel_regularizer: kernel regularizer for the conv operation. use_batch_norm: if True, apply batch norm after the conv operation. batch_norm_layer: class to use for batch norm, if applied. batch_norm_momentum: momentum of the batch norm operation, if applied. batch_norm_epsilon: epsilon of the batch norm operation, if applied. use_sync_bn: if True, use synchronized batch normalization. activation: activation after the conv and batch norm operations. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. use_buffered_input: if True, the input is expected to be padded beforehand. In effect, calling this layer will use 'valid' padding on the temporal dimension to simulate 'causal' padding. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the ConvBlock operation. """ super(ConvBlock, self).__init__(**kwargs) kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') strides = normalize_tuple(strides, 3, 'strides') self._filters = filters self._kernel_size = kernel_size self._strides = strides self._depthwise = depthwise self._causal = causal self._use_bias = use_bias self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._use_batch_norm = use_batch_norm self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._activation = activation self._conv_type = conv_type self._use_buffered_input = use_buffered_input if activation is not None: self._activation_layer = tf_utils.get_activation( activation, use_keras_layer=True) else: self._activation_layer = None self._groups = None
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'filters': self._filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'depthwise': self._depthwise,
'causal': self._causal,
'use_bias': self._use_bias,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'use_batch_norm': self._use_batch_norm,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn,
'activation': self._activation,
'conv_type': self._conv_type,
'use_buffered_input': self._use_buffered_input,
}
base_config = super(ConvBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
"""Builds the layer with the given input shape."""
padding = 'causal' if self._causal else 'same'
self._groups = input_shape[-1] if self._depthwise else 1
textself._batch_norm = None self._batch_norm_temporal = None if self._use_batch_norm: self._batch_norm = self._batch_norm_layer( momentum=self._batch_norm_momentum, epsilon=self._batch_norm_epsilon, synchronized=self._use_sync_bn, name='bn') if self._conv_type != '3d' and self._kernel_size[0] > 1: self._batch_norm_temporal = self._batch_norm_layer( momentum=self._batch_norm_momentum, epsilon=self._batch_norm_epsilon, synchronized=self._use_sync_bn, name='bn_temporal') self._conv_temporal = None if self._conv_type == '3d_2plus1d' and self._kernel_size[0] > 1: self._conv = nn_layers.Conv3D( self._filters, (1, self._kernel_size[1], self._kernel_size[2]), strides=(1, self._strides[1], self._strides[2]), padding='same', groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=False, name='conv3d') self._conv_temporal = nn_layers.Conv3D( self._filters, (self._kernel_size[0], 1, 1), strides=(self._strides[0], 1, 1), padding=padding, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, name='conv3d_temporal') elif self._conv_type == '2plus1d': self._conv = MobileConv2D( self._filters, (self._kernel_size[1], self._kernel_size[2]), strides=(self._strides[1], self._strides[2]), padding='same', use_depthwise=self._depthwise, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=False, batch_norm_op=self._batch_norm, activation_op=self._activation_layer, name='conv2d') if self._kernel_size[0] > 1: self._conv_temporal = MobileConv2D( self._filters, (self._kernel_size[0], 1), strides=(self._strides[0], 1), padding=padding, use_temporal=True, use_depthwise=self._depthwise, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, batch_norm_op=self._batch_norm_temporal, activation_op=self._activation_layer, name='conv2d_temporal') else: self._conv = nn_layers.Conv3D( self._filters, self._kernel_size, strides=self._strides, padding=padding, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, name='conv3d') super(ConvBlock, self).build(input_shape)
def call(self, inputs):
"""Calls the layer with the given inputs."""
x = inputs
text# bn_op and activation_op are folded into the '2plus1d' conv layer so that # we do not explicitly call them here. # TODO(lzyuan): clean the conv layers api once the models are re-trained. x = self._conv(x) if self._batch_norm is not None and self._conv_type != '2plus1d': x = self._batch_norm(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) if self._conv_temporal is not None: x = self._conv_temporal(x) if self._batch_norm_temporal is not None and self._conv_type != '2plus1d': x = self._batch_norm_temporal(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) return x
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamBuffer(tf_keras.layers.Layer):
"""Stream buffer wrapper which caches activations of previous frames."""
def init(self,
buffer_size: int,
state_prefix: Optional[str] = None,
**kwargs):
"""Initializes a stream buffer.
textArgs: buffer_size: the number of input frames to cache. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the StreamBuffer operation. """ super(StreamBuffer, self).__init__(**kwargs) state_prefix = state_prefix if state_prefix is not None else '' self._state_prefix = state_prefix self._state_name = f'{state_prefix}_stream_buffer' self._buffer_size = buffer_size
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'buffer_size': self._buffer_size,
'state_prefix': self._state_prefix,
}
base_config = super(StreamBuffer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(
self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None,
) -> Tuple[Any, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Expected keys include `state_prefix + '_stream_buffer'`. Returns: the output tensor and states """ states = dict(states) if states is not None else {} buffer = states.get(self._state_name, None) # Create the buffer if it does not exist in the states. # Output buffer shape: # [batch_size, buffer_size, input_height, input_width, num_channels] if buffer is None: shape = tf.shape(inputs) buffer = tf.zeros( [shape[0], self._buffer_size, shape[2], shape[3], shape[4]], dtype=inputs.dtype) # tf.pad has limited support for tf lite, so use tf.concat instead. full_inputs = tf.concat([buffer, inputs], axis=1) # Cache the last b frames of the input where b is the buffer size and f # is the number of input frames. If b > f, then we will cache the last b - f # frames from the previous buffer concatenated with the current f input # frames. new_buffer = full_inputs[:, -self._buffer_size:] states[self._state_name] = new_buffer return full_inputs, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamConvBlock(ConvBlock):
"""ConvBlock with StreamBuffer."""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = 1,
depthwise: bool = False,
causal: bool = False,
use_bias: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_batch_norm: bool = True,
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
activation: Optional[Any] = None,
conv_type: str = '3d',
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Initializes a stream conv block.
textArgs: filters: filters for the conv operation. kernel_size: kernel size for the conv operation. strides: strides for the conv operation. depthwise: if True, use DepthwiseConv2D instead of Conv2D causal: if True, use causal mode for the conv operation. use_bias: use bias for the conv operation. kernel_initializer: kernel initializer for the conv operation. kernel_regularizer: kernel regularizer for the conv operation. use_batch_norm: if True, apply batch norm after the conv operation. batch_norm_layer: class to use for batch norm, if applied. batch_norm_momentum: momentum of the batch norm operation, if applied. batch_norm_epsilon: epsilon of the batch norm operation, if applied. use_sync_bn: if True, use synchronized batch normalization. activation: activation after the conv and batch norm operations. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the StreamConvBlock operation. """ kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') buffer_size = kernel_size[0] - 1 use_buffer = buffer_size > 0 and causal self._state_prefix = state_prefix super(StreamConvBlock, self).__init__( filters, kernel_size, strides=strides, depthwise=depthwise, causal=causal, use_bias=use_bias, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=use_batch_norm, batch_norm_layer=batch_norm_layer, batch_norm_momentum=batch_norm_momentum, batch_norm_epsilon=batch_norm_epsilon, use_sync_bn=use_sync_bn, activation=activation, conv_type=conv_type, use_buffered_input=use_buffer, **kwargs) self._stream_buffer = None if use_buffer: self._stream_buffer = StreamBuffer( buffer_size=buffer_size, state_prefix=state_prefix)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {'state_prefix': self._state_prefix}
base_config = super(StreamConvBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} x = inputs # If we have no separate temporal conv, use the buffer before the 3D conv. if self._conv_temporal is None and self._stream_buffer is not None: x, states = self._stream_buffer(x, states=states) # bn_op and activation_op are folded into the '2plus1d' conv layer so that # we do not explicitly call them here. # TODO(lzyuan): clean the conv layers api once the models are re-trained. x = self._conv(x) if self._batch_norm is not None and self._conv_type != '2plus1d': x = self._batch_norm(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) if self._conv_temporal is not None: if self._stream_buffer is not None: # If we have a separate temporal conv, use the buffer before the # 1D conv instead (otherwise, we may waste computation on the 2D conv). x, states = self._stream_buffer(x, states=states) x = self._conv_temporal(x) if self._batch_norm_temporal is not None and self._conv_type != '2plus1d': x = self._batch_norm_temporal(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) return x, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamSqueezeExcitation(tf_keras.layers.Layer):
"""Squeeze and excitation layer with causal mode.
Reference: https://arxiv.org/pdf/1709.01507.pdf
"""
def init(
self,
hidden_filters: int,
se_type: str = '3d',
activation: nn_layers.Activation = 'swish',
gating_activation: nn_layers.Activation = 'sigmoid',
causal: bool = False,
conv_type: str = '3d',
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_positional_encoding: bool = False,
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for squeeze and excitation.
textArgs: hidden_filters: The hidden filters of squeeze excite. se_type: '3d', '2d', or '2plus3d'. '3d' uses the default 3D spatiotemporal global average pooling for squeeze excitation. '2d' uses 2D spatial global average pooling on each frame. '2plus3d' concatenates both 3D and 2D global average pooling. activation: name of the activation function. gating_activation: name of the activation function for gating. causal: if True, use causal mode in the global average pool. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operation. use_positional_encoding: add a positional encoding after the (cumulative) global average pooling layer. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. """ super(StreamSqueezeExcitation, self).__init__(**kwargs) self._hidden_filters = hidden_filters self._se_type = se_type self._activation = activation self._gating_activation = gating_activation self._causal = causal self._conv_type = conv_type self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._use_positional_encoding = use_positional_encoding self._state_prefix = state_prefix self._spatiotemporal_pool = nn_layers.GlobalAveragePool3D( keepdims=True, causal=causal, state_prefix=state_prefix) self._spatial_pool = nn_layers.SpatialAveragePool3D(keepdims=True) self._pos_encoding = None if use_positional_encoding: self._pos_encoding = nn_layers.PositionalEncoding( initializer='zeros', state_prefix=state_prefix)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'hidden_filters': self._hidden_filters,
'se_type': self._se_type,
'activation': self._activation,
'gating_activation': self._gating_activation,
'causal': self._causal,
'conv_type': self._conv_type,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'use_positional_encoding': self._use_positional_encoding,
'state_prefix': self._state_prefix,
}
base_config = super(StreamSqueezeExcitation, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
"""Builds the layer with the given input shape."""
self._se_reduce = ConvBlock(
filters=self._hidden_filters,
kernel_size=1,
causal=self._causal,
use_bias=True,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
use_batch_norm=False,
activation=self._activation,
conv_type=self._conv_type,
name='se_reduce')
textself._se_expand = ConvBlock( filters=input_shape[-1], kernel_size=1, causal=self._causal, use_bias=True, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_batch_norm=False, activation=self._gating_activation, conv_type=self._conv_type, name='se_expand') super(StreamSqueezeExcitation, self).build(input_shape)
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} if self._se_type == '3d': x, states = self._spatiotemporal_pool( inputs, states=states, output_states=True) elif self._se_type == '2d': x = self._spatial_pool(inputs) elif self._se_type == '2plus3d': x_space = self._spatial_pool(inputs) x, states = self._spatiotemporal_pool( x_space, states=states, output_states=True) if not self._causal: x = tf.tile(x, [1, tf.shape(inputs)[1], 1, 1, 1]) # print("x Shape:", x.shape, "x_space Shape:", x_space.shape) x = tf.concat([x, x_space], axis=-1) # print("x Shape:", x.shape, "x_space Shape:", x_space.shape) else: raise ValueError('Unknown Squeeze Excitation type {}'.format( self._se_type)) if self._pos_encoding is not None: x, states = self._pos_encoding(x, states=states) x = self._se_reduce(x) x = self._se_expand(x) return x * inputs, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class MobileBottleneck(tf_keras.layers.Layer):
"""A depthwise inverted bottleneck block.
Uses dependency injection to allow flexible definition of different layers
within this block.
"""
def init(self,
expansion_layer: tf_keras.layers.Layer,
feature_layer: tf_keras.layers.Layer,
projection_layer: tf_keras.layers.Layer,
attention_layer: Optional[tf_keras.layers.Layer] = None,
skip_layer: Optional[tf_keras.layers.Layer] = None,
stochastic_depth_drop_rate: Optional[float] = None,
**kwargs):
"""Implementation for mobile bottleneck.
textArgs: expansion_layer: initial layer used for pointwise expansion. feature_layer: main layer used for computing 3D features. projection_layer: layer used for pointwise projection. attention_layer: optional layer used for attention-like operations (e.g., squeeze excite). skip_layer: optional skip layer used to project the input before summing with the output for the residual connection. stochastic_depth_drop_rate: optional drop rate for stochastic depth. **kwargs: keyword arguments to be passed to this layer. """ super(MobileBottleneck, self).__init__(**kwargs) self._projection_layer = projection_layer self._attention_layer = attention_layer self._skip_layer = skip_layer self._stochastic_depth_drop_rate = stochastic_depth_drop_rate self._identity = tf_keras.layers.Activation(tf.identity) self._rezero = nn_layers.Scale(initializer='zeros', name='rezero') if stochastic_depth_drop_rate: self._stochastic_depth = nn_layers.StochasticDepth( stochastic_depth_drop_rate, name='stochastic_depth') else: self._stochastic_depth = None self._feature_layer = feature_layer self._expansion_layer = expansion_layer
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'stochastic_depth_drop_rate': self._stochastic_depth_drop_rate,
}
base_config = super(MobileBottleneck, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} x = self._expansion_layer(inputs) x, states = self._feature_layer(x, states=states) if self._attention_layer is not None: x, states = self._attention_layer(x, states=states) x = self._projection_layer(x) # Add identity so that the ops are ordered as written. This is useful for, # e.g., quantization. x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class SkipBlock(tf_keras.layers.Layer):
"""Skip block for bottleneck blocks."""
def init(
self,
out_filters: int,
downsample: bool = False,
conv_type: str = '3d',
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] =
tf_keras.regularizers.L2(KERNEL_WEIGHT_DECAY),
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3, # pytype: disable=annotation-type-mismatch # typed-keras
use_sync_bn: bool = False,
**kwargs):
"""Implementation for skip block.
textArgs: out_filters: the number of projected output filters. downsample: if True, downsamples the input by a factor of 2 by applying average pooling with a 3x3 kernel size on the spatial dimensions. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv projection. batch_norm_layer: class to use for batch norm. batch_norm_momentum: momentum of the batch norm operation. batch_norm_epsilon: epsilon of the batch norm operation. use_sync_bn: if True, use synchronized batch normalization. **kwargs: keyword arguments to be passed to this layer. """ super(SkipBlock, self).__init__(**kwargs) self._out_filters = out_filters self._downsample = downsample self._conv_type = conv_type self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._projection = ConvBlock( filters=self._out_filters, kernel_size=1, conv_type=conv_type, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, name='skip_project') if downsample: if self._conv_type == '2plus1d': self._pool = tf_keras.layers.AveragePooling2D( pool_size=(3, 3), strides=(2, 2), padding='same', name='skip_pool') else: self._pool = tf_keras.layers.AveragePooling3D( pool_size=(1, 3, 3), strides=(1, 2, 2), padding='same', name='skip_pool') else: self._pool = None
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'out_filters': self._out_filters,
'downsample': self._downsample,
'conv_type': self._conv_type,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn
}
base_config = super(SkipBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs):
"""Calls the layer with the given inputs."""
x = inputs
if self._pool is not None:
if self._conv_type == '2plus1d':
x = tf.reshape(x, [-1, tf.shape(x)[2], tf.shape(x)[3], x.shape[4]])
textx = self._pool(x) if self._conv_type == '2plus1d': x = tf.reshape( x, [tf.shape(inputs)[0], -1, tf.shape(x)[1], tf.shape(x)[2], x.shape[3]]) return self._projection(x)
@tf_keras.utils.register_keras_serializable(package='Vision')
class MovinetBlock(tf_keras.layers.Layer):
"""A basic block for MoViNets.
Applies a mobile inverted bottleneck with pointwise expansion, 3D depthwise
convolution, 3D squeeze excite, pointwise projection, and residual connection.
"""
def init(
self,
out_filters: int,
expand_filters: int,
kernel_size: Union[int, Sequence[int]] = (3, 3, 3),
strides: Union[int, Sequence[int]] = (1, 1, 1),
causal: bool = False,
activation: nn_layers.Activation = 'swish',
gating_activation: nn_layers.Activation = 'sigmoid',
se_ratio: float = 0.25,
stochastic_depth_drop_rate: float = 0.,
conv_type: str = '3d',
se_type: str = '3d',
use_positional_encoding: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for MoViNet block.
textArgs: out_filters: number of output filters for the final projection. expand_filters: number of expansion filters after the input. kernel_size: kernel size of the main depthwise convolution. strides: strides of the main depthwise convolution. causal: if True, run the temporal convolutions in causal mode. activation: activation to use across all conv operations. gating_activation: gating activation to use in squeeze excitation layers. se_ratio: squeeze excite filters ratio. stochastic_depth_drop_rate: optional drop rate for stochastic depth. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. se_type: '3d', '2d', or '2plus3d'. '3d' uses the default 3D spatiotemporal global average pooling for squeeze excitation. '2d' uses 2D spatial global average pooling on each frame. '2plus3d' concatenates both 3D and 2D global average pooling. use_positional_encoding: add a positional encoding after the (cumulative) global average pooling layer in the squeeze excite layer. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operations. batch_norm_layer: class to use for batch norm. batch_norm_momentum: momentum of the batch norm operation. batch_norm_epsilon: epsilon of the batch norm operation. use_sync_bn: if True, use synchronized batch normalization. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. """ super(MovinetBlock, self).__init__(**kwargs) self._kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') self._strides = normalize_tuple(strides, 3, 'strides') # Use a multiplier of 2 if concatenating multiple features se_multiplier = 2 if se_type == '2plus3d' else 1 se_hidden_filters = nn_layers.make_divisible( se_ratio * expand_filters * se_multiplier, divisor=8) self._out_filters = out_filters self._expand_filters = expand_filters self._causal = causal self._activation = activation self._gating_activation = gating_activation self._se_ratio = se_ratio self._downsample = any(s > 1 for s in self._strides) self._stochastic_depth_drop_rate = stochastic_depth_drop_rate self._conv_type = conv_type self._se_type = se_type self._use_positional_encoding = use_positional_encoding self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._state_prefix = state_prefix self._expansion = ConvBlock( expand_filters, (1, 1, 1), activation=activation, conv_type=conv_type, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, name='expansion') self._feature = StreamConvBlock( expand_filters, self._kernel_size, strides=self._strides, depthwise=True, causal=self._causal, activation=activation, conv_type=conv_type, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, state_prefix=state_prefix, name='feature') self._projection = ConvBlock( out_filters, (1, 1, 1), activation=None, conv_type=conv_type, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, name='projection') self._attention = None if se_type != 'none': self._attention = StreamSqueezeExcitation( se_hidden_filters, se_type=se_type, activation=activation, gating_activation=gating_activation, causal=self._causal, conv_type=conv_type, use_positional_encoding=use_positional_encoding, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, state_prefix=state_prefix, name='se')
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'out_filters': self._out_filters,
'expand_filters': self._expand_filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'causal': self._causal,
'activation': self._activation,
'gating_activation': self._gating_activation,
'se_ratio': self._se_ratio,
'stochastic_depth_drop_rate': self._stochastic_depth_drop_rate,
'conv_type': self._conv_type,
'se_type': self._se_type,
'use_positional_encoding': self._use_positional_encoding,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn,
'state_prefix': self._state_prefix,
}
base_config = super(MovinetBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
"""Builds the layer with the given input shape."""
if input_shape[-1] == self._out_filters and not self._downsample:
self._skip = None
else:
self._skip = SkipBlock(
self._out_filters,
downsample=self._downsample,
conv_type=self._conv_type,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
name='skip')
textself._mobile_bottleneck = MobileBottleneck( self._expansion, self._feature, self._projection, attention_layer=self._attention, skip_layer=self._skip, stochastic_depth_drop_rate=self._stochastic_depth_drop_rate, name='bneck') super(MovinetBlock, self).build(input_shape)
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} return self._mobile_bottleneck(inputs, states=states)
@tf_keras.utils.register_keras_serializable(package='Vision')
class Stem(tf_keras.layers.Layer):
"""Stem layer for video networks.
Applies an initial convolution block operation.
"""
def init(
self,
out_filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = (1, 1, 1),
causal: bool = False,
conv_type: str = '3d',
activation: nn_layers.Activation = 'swish',
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for video model stem.
textArgs: out_filters: number of output filters. kernel_size: kernel size of the convolution. strides: strides of the convolution. causal: if True, run the temporal convolutions in causal mode. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. activation: the input activation name. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operations. batch_norm_layer: class to use for batch norm. batch_norm_momentum: momentum of the batch norm operation. batch_norm_epsilon: epsilon of the batch norm operation. use_sync_bn: if True, use synchronized batch normalization. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. """ super(Stem, self).__init__(**kwargs) self._out_filters = out_filters self._kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') self._strides = normalize_tuple(strides, 3, 'strides') self._causal = causal self._conv_type = conv_type self._activation = activation self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._state_prefix = state_prefix self._stem = StreamConvBlock( filters=self._out_filters, kernel_size=self._kernel_size, strides=self._strides, causal=self._causal, activation=self._activation, conv_type=self._conv_type, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, state_prefix=self._state_prefix, name='stem')
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'out_filters': self._out_filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'causal': self._causal,
'activation': self._activation,
'conv_type': self._conv_type,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn,
'state_prefix': self._state_prefix,
}
base_config = super(Stem, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} return self._stem(inputs, states=states)
@tf_keras.utils.register_keras_serializable(package='Vision')
class Head(tf_keras.layers.Layer):
"""Head layer for video networks.
Applies pointwise projection and global pooling.
"""
def init(
self,
project_filters: int,
conv_type: str = '3d',
activation: nn_layers.Activation = 'swish',
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
average_pooling_type: str = '3d',
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for video model head.
textArgs: project_filters: number of pointwise projection filters. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. activation: the input activation name. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operations. batch_norm_layer: class to use for batch norm. batch_norm_momentum: momentum of the batch norm operation. batch_norm_epsilon: epsilon of the batch norm operation. use_sync_bn: if True, use synchronized batch normalization. average_pooling_type: The average pooling type. Currently supporting ['3d', '2d', 'none']. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. """ super(Head, self).__init__(**kwargs) self._project_filters = project_filters self._conv_type = conv_type self._activation = activation self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._state_prefix = state_prefix self._project = ConvBlock( filters=project_filters, kernel_size=1, activation=activation, conv_type=conv_type, kernel_regularizer=kernel_regularizer, use_batch_norm=True, batch_norm_layer=self._batch_norm_layer, batch_norm_momentum=self._batch_norm_momentum, batch_norm_epsilon=self._batch_norm_epsilon, use_sync_bn=self._use_sync_bn, name='project') if average_pooling_type.lower() == '3d': self._pool = nn_layers.GlobalAveragePool3D( keepdims=True, causal=False, state_prefix=state_prefix) elif average_pooling_type.lower() == '2d': self._pool = nn_layers.SpatialAveragePool3D(keepdims=True) elif average_pooling_type == 'none': self._pool = None else: raise ValueError( '%s average_pooling_type is not supported.' % average_pooling_type)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'project_filters': self._project_filters,
'conv_type': self._conv_type,
'activation': self._activation,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn,
'state_prefix': self._state_prefix,
}
base_config = super(Head, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(
self,
inputs: Union[tf.Tensor, Mapping[str, tf.Tensor]],
states: Optional[nn_layers.States] = None,
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor or dict of endpoints. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} x = self._project(inputs) if self._pool is not None: outputs = self._pool(x, states=states, output_states=True) else: outputs = (x, states) return outputs
@tf_keras.utils.register_keras_serializable(package='Vision')
class ClassifierHead(tf_keras.layers.Layer):
"""Head layer for video networks.
Applies dense projection, dropout, and classifier projection. Expects input
to be pooled vector with shape [batch_size, 1, 1, 1, num_channels]
"""
def init(
self,
head_filters: int,
num_classes: int,
dropout_rate: float = 0.,
conv_type: str = '3d',
activation: nn_layers.Activation = 'swish',
output_activation: Optional[nn_layers.Activation] = None,
max_pool_predictions: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] =
tf_keras.regularizers.L2(KERNEL_WEIGHT_DECAY), # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for video model classifier head.
textArgs: head_filters: number of dense head projection filters. num_classes: number of output classes for the final logits. dropout_rate: the dropout rate applied to the head projection. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. activation: the input activation name. output_activation: optional final activation (e.g., 'softmax'). max_pool_predictions: apply temporal softmax pooling to predictions. Intended for multi-label prediction, where multiple labels are distributed across the video. Currently only supports single clips. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operations. **kwargs: keyword arguments to be passed to this layer. """ super(ClassifierHead, self).__init__(**kwargs) self._head_filters = head_filters self._num_classes = num_classes self._dropout_rate = dropout_rate self._conv_type = conv_type self._activation = activation self._output_activation = output_activation self._max_pool_predictions = max_pool_predictions self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._dropout = tf_keras.layers.Dropout(dropout_rate) self._head = ConvBlock( filters=head_filters, kernel_size=1, activation=activation, use_bias=True, use_batch_norm=False, conv_type=conv_type, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, name='head') self._classifier = ConvBlock( filters=num_classes, kernel_size=1, kernel_initializer=tf_keras.initializers.random_normal(stddev=0.01), kernel_regularizer=None, use_bias=True, use_batch_norm=False, conv_type=conv_type, name='classifier') self._max_pool = nn_layers.TemporalSoftmaxPool() self._squeeze = Squeeze3D() output_activation = output_activation if output_activation else 'linear' self._cast = tf_keras.layers.Activation( output_activation, dtype='float32', name='cast')
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'head_filters': self._head_filters,
'num_classes': self._num_classes,
'dropout_rate': self._dropout_rate,
'conv_type': self._conv_type,
'activation': self._activation,
'output_activation': self._output_activation,
'max_pool_predictions': self._max_pool_predictions,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
}
base_config = super(ClassifierHead, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs: tf.Tensor) -> tf.Tensor:
"""Calls the layer with the given inputs."""
# Input Shape: [batch_size, 1, 1, 1, input_channels]
x = inputs
textx = self._head(x) if self._dropout_rate and self._dropout_rate > 0: x = self._dropout(x) x = self._classifier(x) if self._max_pool_predictions: x = self._max_pool(x) x = self._squeeze(x) x = self._cast(x) return x
@tf_keras.utils.register_keras_serializable(package='Vision')
class SimAM(tf_keras.layers.Layer):
def init(self, e_lambda=1e-4, **kwargs):
super(SimAM, self).init(**kwargs)
self.e_lambda = e_lambda
self.sigmoid = tf_keras.activations.sigmoid
textdef call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) input_shape = tf.shape(inputs) batch, time, height, width, channels = input_shape[0], input_shape[1], input_shape[2], input_shape[3], input_shape[4] n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) # 应用Sigmoid激活 activation = self.sigmoid(y) # 形状: (batch, time, height, width, channels) # 输出 return inputs * activation # 形状: (batch, time, height, width, channels) def get_config(self): config = super(SimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda }) return config
class FTFC_ExpandSubNet(tf.keras.layers.Layer):
def init(self, filters=15, kernel_size=(3, 3, 3), strides=(1, 1, 1), weight_decay=0.005, dropout_rate=0.5):
super(FTFC_ExpandSubNet, self).init()
self.filters = filters
self.kernel_size = kernel_size
self.strides = strides
self.weight_decay = weight_decay
self.dropout_rate = dropout_rate
textself.conv3d_1 = tf.keras.layers.Conv3D( filters=filters * 2, kernel_size=self.kernel_size, strides=self.strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(self.weight_decay) ) self.bn_1 = tf.keras.layers.BatchNormalization() self.conv3d_3 = tf.keras.layers.Conv3D( filters=filters, kernel_size=self.kernel_size, strides=self.strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(self.weight_decay) ) self.bn_3 = tf.keras.layers.BatchNormalization() def mish(self, tensor): return tensor * tf.nn.tanh(tf.nn.softplus(tensor)) def call(self, input_tensor, training=True): x = self.conv3d_1(input_tensor) x = self.bn_1(x, training=training) x = self.mish(x) x = self.conv3d_3(x) x = self.bn_3(x, training=training) x = self.mish(x) return x
class FTCF_Block(tf.keras.layers.Layer):
def init(self, filters=32, kernel_size=(3, 3, 3), strides=(1, 1, 1), weight_decay=0.005):
super(FTCF_Block, self).init()
self.filters = filters
self.kernel_size = kernel_size
self.strides = strides
self.weight_decay = weight_decay
textdef build(self, input_shape): self.conv3d_up = tf.keras.layers.Conv3D( filters=self.filters, kernel_size=(self.kernel_size[0], self.kernel_size[1], 1), strides=self.strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(self.weight_decay), ) self.bn_up = tf.keras.layers.BatchNormalization() self.ftfc_expand = FTFC_ExpandSubNet( filters=self.filters, kernel_size=self.kernel_size, strides=self.strides, weight_decay=self.weight_decay ) self.conv3d_cp = tf.keras.layers.Conv3D( filters=self.filters, kernel_size=(1, 1, 1), strides=self.strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(self.weight_decay), ) self.bn_cp = tf.keras.layers.BatchNormalization() def mish(self, tensor): return tensor * tf.nn.tanh(tf.nn.softplus(tensor)) def call(self, input_tensor, training=True): # 上采样路径 x_up = self.conv3d_up(input_tensor) x_up = self.bn_up(x_up, training = training) # tf.print("x_up shape after conv3d_up:", tf.shape(x_up)) # 暂时注释掉 # 主路径 x = self.ftfc_expand(input_tensor, training = training) x = self.conv3d_cp(x) x = self.bn_cp(x, training = training) # tf.print("x shape after conv3d_cp:", tf.shape(x)) # 暂时注释掉 # 残差连接 x = tf.keras.layers.add([x, x_up]) x = self.mish(x) # tf.print("x shape after add and mish:", tf.shape(x)) # 暂时注释掉 return x
将SimAM注意力机制集成到MoViNet模型中,并确保其与因果卷积和流推理兼容,需要谨慎设计,以避免SimAM引入对未来帧的依赖。以下是详细的步骤和建议,帮助您实现这一目标:
SimAM(Simple Attention Module)是一种无参数的注意力机制,通过计算每个神经元的重要性来增强特征表示。由于其设计的简洁性,SimAM本身并不依赖于额外的参数或复杂的计算,这使其适合嵌入到现有的网络架构中。然而,为了在因果卷积和流推理的环境中使用SimAM,需要确保以下几点:
在MoViNet的架构中,通常会在每个基本块(如MovinetBlock
)中包含多个卷积层和可能的注意力机制。为了集成SimAM,建议将其插入到以下位置:
默认的SimAM实现可能会在计算注意力权重时使用整个特征图的信息。为了确保因果性,需要调整SimAM的计算,使其仅基于当前及过去的空间信息。具体修改如下:
python@tf_keras.utils.register_keras_serializable(package='Vision') class CausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(CausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf_keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值,仅基于当前帧 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) # 应用Sigmoid激活 activation = self.sigmoid(y) # 形状: (batch, time, height, width, channels) # 输出 return inputs * activation # 形状: (batch, time, height, width, channels) def get_config(self): config = super(CausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda }) return config
说明:
CausalSimAM
类与原始SimAM类似,但确保在计算注意力权重时仅基于当前帧的信息。假设您希望在每个MovinetBlock
的MobileBottleneck
之后添加SimAM,可以按照以下步骤进行:
MobileBottleneck
类:在MobileBottleneck
的call
方法中,在投影层之后添加SimAM:
python@tf_keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf_keras.layers.Layer): # ...(现有代码保持不变) def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: states = dict(states) if states is not None else {} x = self._expansion_layer(inputs) x, states = self._feature_layer(x, states=states) if self._attention_layer is not None: x, states = self._attention_layer(x, states=states) x = self._projection_layer(x) # 添加SimAM simam = CausalSimAM() x = simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states
MovinetBlock
中调用修改后的MobileBottleneck
:确保MovinetBlock
使用的是修改后的MobileBottleneck
,这样每个基本块都会应用SimAM。
在流推理模式下,模型逐帧处理输入。在这种情况下,确保SimAM不会引入对未来帧的依赖至关重要。为此:
CausalSimAM
中,每个时间步的注意力权重仅基于当前帧的特征。根据需要,您也可以在网络的其他部分(如Stem
或Head
)集成SimAM,以进一步增强特征表示。例如,在Stem
层的输出之后添加SimAM:
python@tf_keras.utils.register_keras_serializable(package='Vision') class Stem(tf_keras.layers.Layer): # ...(现有代码保持不变) def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: states = dict(states) if states is not None else {} x, states = self._stem(inputs, states=states) # 添加SimAM simam = CausalSimAM() x = simam(x) return x, states
在完成上述修改后,务必对模型进行充分的测试,以确保:
通过在MoViNet的基本块中适当地集成SimAM,并确保其计算过程仅依赖于当前及过去的帧信息,您可以在不干扰因果卷积和流推理的前提下,利用SimAM提升模型的特征表示能力。这种集成方式既保留了因果性和流式处理的优势,又能充分发挥注意力机制在增强特征表达方面的潜力。
我想用这个点作为小论文对原模型的改进点,怎么描述这个点?实现了什么?从深度学习和人工智能以及数学领域进行描述讲解。
下面提供了一段较为完整、系统的描述示例,涵盖了从深度学习与人工智能层面、数学原理层面,以及对模型改进点的阐述。您可以根据需要进行取舍和调整。
在原有的MoViNet模型中,我们通过引入SimAM(Simple Attention Module)注意力机制来增强网络对时空特征的捕捉能力。不同于传统需要额外可学习参数的注意力模块(例如通道注意力、空间注意力等),SimAM采用了一种无参数的、基于能量函数的注意力形式,能够在增强特征表达能力的同时,不显著增加计算量和模型复杂度。
更重要的是,为了适应MoViNet的因果卷积与流推理(Streaming Inference)特性,我们对SimAM进行了因果性约束的改动,保证每个时间步的特征只依赖于当前及过去的帧,不会泄露未来信息。这使得在实际应用场景中,模型可以逐帧实时推理,并且准确度获得提升。
深度卷积网络的特征增强
轻量化注意力机制
因果性(Causality)保证
无参数的能量函数设计
SimAM核心基于以下能量函数思想:
这里, 表示特征值(像素或激活值), 是在空间(或时空)位置上的平均值, 表示方差, 是用于数值稳定的小量。该函数刻画了特征值相对于平均值的“偏离度”,并通过Sigmoid进一步变换为 [0, 1] 区间,作为注意力权重来增强或抑制相应位置。
因果卷积约束的实现
causal padding
)和缓冲策略(Buffer)来确保模型在第 帧的处理仅依赖帧 的信息。数值稳定与高阶可微性
性能提升与稳定训练
轻量级、可移植
对人工智能和工业应用的价值
综上所述,本研究在MoViNet模型的基础上引入了SimAM无参数注意力机制,并通过调整其计算方式以保证因果卷积与流式推理的特性,从而在视频分析任务中进一步提升准确度和特征表达能力。该方法在深度学习、人工智能应用以及数学原理上都呈现出一定的理论意义与实用价值,为后续研究和工业应用提供了新的思路与手段。
class simam_module(torch.nn.Module):
def init(self, channels = None, e_lambda = 1e-4):
super(simam_module, self).init()
textself.activaton = nn.Sigmoid() self.e_lambda = e_lambda def __repr__(self): s = self.__class__.__name__ + '(' s += ('lambda=%f)' % self.e_lambda) return s @staticmethod def get_module_name(): return "simam" def forward(self, x): b, c, h, w = x.size() n = w * h - 1 x_minus_mu_square = (x - x.mean(dim=[2,3], keepdim=True)).pow(2) y = x_minus_mu_square / (4 * (x_minus_mu_square.sum(dim=[2,3], keepdim=True) / n + self.e_lambda)) + 0.5 return x * self.activaton(y)
这个和下面这个
"""Contains common building blocks for MoViNets.
Reference: https://arxiv.org/pdf/2103.11511.pdf
"""
from typing import Any, Mapping, Optional, Sequence, Tuple, Union
import tensorflow as tf, tf_keras
from official.modeling import tf_utils
from official.vision.modeling.layers import nn_layers
KERNEL_WEIGHT_DECAY = 1.5e-5
def normalize_tuple(value: Union[int, Tuple[int, ...]], size: int, name: str):
"""Transforms a single integer or iterable of integers into an integer tuple.
Arguments:
value: The value to validate and convert. Could an int, or any iterable of
ints.
size: The size of the tuple to be returned.
name: The name of the argument being validated, e.g. "strides" or
"kernel_size". This is only used to format error messages.
Returns:
A tuple of size
integers.
Raises:
ValueError: If something else than an int/long or iterable thereof was
passed.
"""
if isinstance(value, int):
return (value,) * size
else:
try:
value_tuple = tuple(value)
except TypeError:
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value))
if len(value_tuple) != size:
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value))
for single_value in value_tuple:
try:
int(single_value)
except (ValueError, TypeError):
raise ValueError('The ' + name + '
argument must be a tuple of ' +
str(size) + ' integers. Received: ' + str(value) + ' '
'including element ' + str(single_value) + ' of type' +
' ' + str(type(single_value)))
return value_tuple
@tf_keras.utils.register_keras_serializable(package='Vision')
class Squeeze3D(tf_keras.layers.Layer):
"""Squeeze3D layer to remove singular dimensions."""
def call(self, inputs):
"""Calls the layer with the given inputs."""
return tf.squeeze(inputs, axis=(1, 2, 3))
@tf_keras.utils.register_keras_serializable(package='Vision')
class MobileConv2D(tf_keras.layers.Layer):
"""Conv2D layer with extra options to support mobile devices.
Reshapes 5D video tensor inputs to 4D, allowing Conv2D to run across
dimensions (2, 3) or (3, 4). Reshapes tensors back to 5D when returning the
output.
"""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = (1, 1),
padding: str = 'valid',
data_format: Optional[str] = None,
dilation_rate: Union[int, Sequence[int]] = (1, 1),
groups: int = 1,
use_bias: bool = True,
kernel_initializer: str = 'glorot_uniform',
bias_initializer: str = 'zeros',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
bias_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
activity_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
kernel_constraint: Optional[tf_keras.constraints.Constraint] = None,
bias_constraint: Optional[tf_keras.constraints.Constraint] = None,
use_depthwise: bool = False,
use_temporal: bool = False,
use_buffered_input: bool = False, # pytype: disable=annotation-type-mismatch # typed-keras
batch_norm_op: Optional[Any] = None,
activation_op: Optional[Any] = None,
**kwargs): # pylint: disable=g-doc-args
"""Initializes mobile conv2d.
textFor the majority of arguments, see tf_keras.layers.Conv2D. Args: use_depthwise: if True, use DepthwiseConv2D instead of Conv2D use_temporal: if True, apply Conv2D starting from the temporal dimension instead of the spatial dimensions. use_buffered_input: if True, the input is expected to be padded beforehand. In effect, calling this layer will use 'valid' padding on the temporal dimension to simulate 'causal' padding. batch_norm_op: A callable object of batch norm layer. If None, no batch norm will be applied after the convolution. activation_op: A callabel object of activation layer. If None, no activation will be applied after the convolution. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the MobileConv2D operation. """ super(MobileConv2D, self).__init__(**kwargs) self._filters = filters self._kernel_size = kernel_size self._strides = strides self._padding = padding self._data_format = data_format self._dilation_rate = dilation_rate self._groups = groups self._use_bias = use_bias self._kernel_initializer = kernel_initializer self._bias_initializer = bias_initializer self._kernel_regularizer = kernel_regularizer self._bias_regularizer = bias_regularizer self._activity_regularizer = activity_regularizer self._kernel_constraint = kernel_constraint self._bias_constraint = bias_constraint self._use_depthwise = use_depthwise self._use_temporal = use_temporal self._use_buffered_input = use_buffered_input self._batch_norm_op = batch_norm_op self._activation_op = activation_op kernel_size = normalize_tuple(kernel_size, 2, 'kernel_size') if self._use_temporal and kernel_size[1] > 1: raise ValueError('Temporal conv with spatial kernel is not supported.') if use_depthwise: self._conv = nn_layers.DepthwiseConv2D( kernel_size=kernel_size, strides=strides, padding=padding, depth_multiplier=1, data_format=data_format, dilation_rate=dilation_rate, use_bias=use_bias, depthwise_initializer=kernel_initializer, bias_initializer=bias_initializer, depthwise_regularizer=kernel_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, depthwise_constraint=kernel_constraint, bias_constraint=bias_constraint, use_buffered_input=use_buffered_input) else: self._conv = nn_layers.Conv2D( filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, data_format=data_format, dilation_rate=dilation_rate, groups=groups, use_bias=use_bias, kernel_initializer=kernel_initializer, bias_initializer=bias_initializer, kernel_regularizer=kernel_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, kernel_constraint=kernel_constraint, bias_constraint=bias_constraint, use_buffered_input=use_buffered_input)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'filters': self._filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'padding': self._padding,
'data_format': self._data_format,
'dilation_rate': self._dilation_rate,
'groups': self._groups,
'use_bias': self._use_bias,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'activity_regularizer': self._activity_regularizer,
'kernel_constraint': self._kernel_constraint,
'bias_constraint': self._bias_constraint,
'use_depthwise': self._use_depthwise,
'use_temporal': self._use_temporal,
'use_buffered_input': self._use_buffered_input,
}
base_config = super(MobileConv2D, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs):
"""Calls the layer with the given inputs."""
if self._use_temporal:
input_shape = [
tf.shape(inputs)[0],
tf.shape(inputs)[1],
tf.shape(inputs)[2] * tf.shape(inputs)[3],
inputs.shape[4]]
else:
input_shape = [
tf.shape(inputs)[0] * tf.shape(inputs)[1],
tf.shape(inputs)[2],
tf.shape(inputs)[3],
inputs.shape[4]]
x = tf.reshape(inputs, input_shape)
textx = self._conv(x) if self._batch_norm_op is not None: x = self._batch_norm_op(x) if self._activation_op is not None: x = self._activation_op(x) if self._use_temporal: output_shape = [ tf.shape(x)[0], tf.shape(x)[1], tf.shape(inputs)[2], tf.shape(inputs)[3], x.shape[3]] else: output_shape = [ tf.shape(inputs)[0], tf.shape(inputs)[1], tf.shape(x)[1], tf.shape(x)[2], x.shape[3]] x = tf.reshape(x, output_shape) return x
@tf_keras.utils.register_keras_serializable(package='Vision')
class ConvBlock(tf_keras.layers.Layer):
"""A Conv followed by optional BatchNorm and Activation."""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = 1,
depthwise: bool = False,
causal: bool = False,
use_bias: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] =
tf_keras.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_batch_norm: bool = True,
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
activation: Optional[Any] = None,
conv_type: str = '3d',
use_buffered_input: bool = False, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Initializes a conv block.
textArgs: filters: filters for the conv operation. kernel_size: kernel size for the conv operation. strides: strides for the conv operation. depthwise: if True, use DepthwiseConv2D instead of Conv2D causal: if True, use causal mode for the conv operation. use_bias: use bias for the conv operation. kernel_initializer: kernel initializer for the conv operation. kernel_regularizer: kernel regularizer for the conv operation. use_batch_norm: if True, apply batch norm after the conv operation. batch_norm_layer: class to use for batch norm, if applied. batch_norm_momentum: momentum of the batch norm operation, if applied. batch_norm_epsilon: epsilon of the batch norm operation, if applied. use_sync_bn: if True, use synchronized batch normalization. activation: activation after the conv and batch norm operations. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. use_buffered_input: if True, the input is expected to be padded beforehand. In effect, calling this layer will use 'valid' padding on the temporal dimension to simulate 'causal' padding. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the ConvBlock operation. """ super(ConvBlock, self).__init__(**kwargs) kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') strides = normalize_tuple(strides, 3, 'strides') self._filters = filters self._kernel_size = kernel_size self._strides = strides self._depthwise = depthwise self._causal = causal self._use_bias = use_bias self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._use_batch_norm = use_batch_norm self._batch_norm_layer = batch_norm_layer self._batch_norm_momentum = batch_norm_momentum self._batch_norm_epsilon = batch_norm_epsilon self._use_sync_bn = use_sync_bn self._activation = activation self._conv_type = conv_type self._use_buffered_input = use_buffered_input if activation is not None: self._activation_layer = tf_utils.get_activation( activation, use_keras_layer=True) else: self._activation_layer = None self._groups = None
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'filters': self._filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'depthwise': self._depthwise,
'causal': self._causal,
'use_bias': self._use_bias,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'use_batch_norm': self._use_batch_norm,
'batch_norm_momentum': self._batch_norm_momentum,
'batch_norm_epsilon': self._batch_norm_epsilon,
'use_sync_bn': self._use_sync_bn,
'activation': self._activation,
'conv_type': self._conv_type,
'use_buffered_input': self._use_buffered_input,
}
base_config = super(ConvBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
"""Builds the layer with the given input shape."""
padding = 'causal' if self._causal else 'same'
self._groups = input_shape[-1] if self._depthwise else 1
textself._batch_norm = None self._batch_norm_temporal = None if self._use_batch_norm: self._batch_norm = self._batch_norm_layer( momentum=self._batch_norm_momentum, epsilon=self._batch_norm_epsilon, synchronized=self._use_sync_bn, name='bn') if self._conv_type != '3d' and self._kernel_size[0] > 1: self._batch_norm_temporal = self._batch_norm_layer( momentum=self._batch_norm_momentum, epsilon=self._batch_norm_epsilon, synchronized=self._use_sync_bn, name='bn_temporal') self._conv_temporal = None if self._conv_type == '3d_2plus1d' and self._kernel_size[0] > 1: self._conv = nn_layers.Conv3D( self._filters, (1, self._kernel_size[1], self._kernel_size[2]), strides=(1, self._strides[1], self._strides[2]), padding='same', groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=False, name='conv3d') self._conv_temporal = nn_layers.Conv3D( self._filters, (self._kernel_size[0], 1, 1), strides=(self._strides[0], 1, 1), padding=padding, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, name='conv3d_temporal') elif self._conv_type == '2plus1d': self._conv = MobileConv2D( self._filters, (self._kernel_size[1], self._kernel_size[2]), strides=(self._strides[1], self._strides[2]), padding='same', use_depthwise=self._depthwise, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=False, batch_norm_op=self._batch_norm, activation_op=self._activation_layer, name='conv2d') if self._kernel_size[0] > 1: self._conv_temporal = MobileConv2D( self._filters, (self._kernel_size[0], 1), strides=(self._strides[0], 1), padding=padding, use_temporal=True, use_depthwise=self._depthwise, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, batch_norm_op=self._batch_norm_temporal, activation_op=self._activation_layer, name='conv2d_temporal') else: self._conv = nn_layers.Conv3D( self._filters, self._kernel_size, strides=self._strides, padding=padding, groups=self._groups, use_bias=self._use_bias, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_buffered_input=self._use_buffered_input, name='conv3d') super(ConvBlock, self).build(input_shape)
def call(self, inputs):
"""Calls the layer with the given inputs."""
x = inputs
text# bn_op and activation_op are folded into the '2plus1d' conv layer so that # we do not explicitly call them here. # TODO(lzyuan): clean the conv layers api once the models are re-trained. x = self._conv(x) if self._batch_norm is not None and self._conv_type != '2plus1d': x = self._batch_norm(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) if self._conv_temporal is not None: x = self._conv_temporal(x) if self._batch_norm_temporal is not None and self._conv_type != '2plus1d': x = self._batch_norm_temporal(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) return x
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamBuffer(tf_keras.layers.Layer):
"""Stream buffer wrapper which caches activations of previous frames."""
def init(self,
buffer_size: int,
state_prefix: Optional[str] = None,
**kwargs):
"""Initializes a stream buffer.
textArgs: buffer_size: the number of input frames to cache. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the StreamBuffer operation. """ super(StreamBuffer, self).__init__(**kwargs) state_prefix = state_prefix if state_prefix is not None else '' self._state_prefix = state_prefix self._state_name = f'{state_prefix}_stream_buffer' self._buffer_size = buffer_size
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'buffer_size': self._buffer_size,
'state_prefix': self._state_prefix,
}
base_config = super(StreamBuffer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(
self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None,
) -> Tuple[Any, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Expected keys include `state_prefix + '_stream_buffer'`. Returns: the output tensor and states """ states = dict(states) if states is not None else {} buffer = states.get(self._state_name, None) # Create the buffer if it does not exist in the states. # Output buffer shape: # [batch_size, buffer_size, input_height, input_width, num_channels] if buffer is None: shape = tf.shape(inputs) buffer = tf.zeros( [shape[0], self._buffer_size, shape[2], shape[3], shape[4]], dtype=inputs.dtype) # tf.pad has limited support for tf lite, so use tf.concat instead. full_inputs = tf.concat([buffer, inputs], axis=1) # Cache the last b frames of the input where b is the buffer size and f # is the number of input frames. If b > f, then we will cache the last b - f # frames from the previous buffer concatenated with the current f input # frames. new_buffer = full_inputs[:, -self._buffer_size:] states[self._state_name] = new_buffer return full_inputs, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamConvBlock(ConvBlock):
"""ConvBlock with StreamBuffer."""
def init(
self,
filters: int,
kernel_size: Union[int, Sequence[int]],
strides: Union[int, Sequence[int]] = 1,
depthwise: bool = False,
causal: bool = False,
use_bias: bool = False,
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_batch_norm: bool = True,
batch_norm_layer: tf_keras.layers.Layer =
tf_keras.layers.BatchNormalization,
batch_norm_momentum: float = 0.99,
batch_norm_epsilon: float = 1e-3,
use_sync_bn: bool = False,
activation: Optional[Any] = None,
conv_type: str = '3d',
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Initializes a stream conv block.
textArgs: filters: filters for the conv operation. kernel_size: kernel size for the conv operation. strides: strides for the conv operation. depthwise: if True, use DepthwiseConv2D instead of Conv2D causal: if True, use causal mode for the conv operation. use_bias: use bias for the conv operation. kernel_initializer: kernel initializer for the conv operation. kernel_regularizer: kernel regularizer for the conv operation. use_batch_norm: if True, apply batch norm after the conv operation. batch_norm_layer: class to use for batch norm, if applied. batch_norm_momentum: momentum of the batch norm operation, if applied. batch_norm_epsilon: epsilon of the batch norm operation, if applied. use_sync_bn: if True, use synchronized batch normalization. activation: activation after the conv and batch norm operations. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. Returns: A output tensor of the StreamConvBlock operation. """ kernel_size = normalize_tuple(kernel_size, 3, 'kernel_size') buffer_size = kernel_size[0] - 1 use_buffer = buffer_size > 0 and causal self._state_prefix = state_prefix super(StreamConvBlock, self).__init__( filters, kernel_size, strides=strides, depthwise=depthwise, causal=causal, use_bias=use_bias, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, use_batch_norm=use_batch_norm, batch_norm_layer=batch_norm_layer, batch_norm_momentum=batch_norm_momentum, batch_norm_epsilon=batch_norm_epsilon, use_sync_bn=use_sync_bn, activation=activation, conv_type=conv_type, use_buffered_input=use_buffer, **kwargs) self._stream_buffer = None if use_buffer: self._stream_buffer = StreamBuffer( buffer_size=buffer_size, state_prefix=state_prefix)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {'state_prefix': self._state_prefix}
base_config = super(StreamConvBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} x = inputs # If we have no separate temporal conv, use the buffer before the 3D conv. if self._conv_temporal is None and self._stream_buffer is not None: x, states = self._stream_buffer(x, states=states) # bn_op and activation_op are folded into the '2plus1d' conv layer so that # we do not explicitly call them here. # TODO(lzyuan): clean the conv layers api once the models are re-trained. x = self._conv(x) if self._batch_norm is not None and self._conv_type != '2plus1d': x = self._batch_norm(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) if self._conv_temporal is not None: if self._stream_buffer is not None: # If we have a separate temporal conv, use the buffer before the # 1D conv instead (otherwise, we may waste computation on the 2D conv). x, states = self._stream_buffer(x, states=states) x = self._conv_temporal(x) if self._batch_norm_temporal is not None and self._conv_type != '2plus1d': x = self._batch_norm_temporal(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) return x, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class StreamSqueezeExcitation(tf_keras.layers.Layer):
"""Squeeze and excitation layer with causal mode.
Reference: https://arxiv.org/pdf/1709.01507.pdf
"""
def init(
self,
hidden_filters: int,
se_type: str = '3d',
activation: nn_layers.Activation = 'swish',
gating_activation: nn_layers.Activation = 'sigmoid',
causal: bool = False,
conv_type: str = '3d',
kernel_initializer: tf_keras.initializers.Initializer = 'HeNormal',
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = tf.keras
.regularizers.L2(KERNEL_WEIGHT_DECAY),
use_positional_encoding: bool = False,
state_prefix: Optional[str] = None, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Implementation for squeeze and excitation.
textArgs: hidden_filters: The hidden filters of squeeze excite. se_type: '3d', '2d', or '2plus3d'. '3d' uses the default 3D spatiotemporal global average pooling for squeeze excitation. '2d' uses 2D spatial global average pooling on each frame. '2plus3d' concatenates both 3D and 2D global average pooling. activation: name of the activation function. gating_activation: name of the activation function for gating. causal: if True, use causal mode in the global average pool. conv_type: '3d', '2plus1d', or '3d_2plus1d'. '3d' uses the default 3D ops. '2plus1d' split any 3D ops into two sequential 2D ops with their own batch norm and activation. '3d_2plus1d' is like '2plus1d', but uses two sequential 3D ops instead. kernel_initializer: kernel initializer for the conv operations. kernel_regularizer: kernel regularizer for the conv operation. use_positional_encoding: add a positional encoding after the (cumulative) global average pooling layer. state_prefix: a prefix string to identify states. **kwargs: keyword arguments to be passed to this layer. """ super(StreamSqueezeExcitation, self).__init__(**kwargs) self._hidden_filters = hidden_filters self._se_type = se_type self._activation = activation self._gating_activation = gating_activation self._causal = causal self._conv_type = conv_type self._kernel_initializer = kernel_initializer self._kernel_regularizer = kernel_regularizer self._use_positional_encoding = use_positional_encoding self._state_prefix = state_prefix self._spatiotemporal_pool = nn_layers.GlobalAveragePool3D( keepdims=True, causal=causal, state_prefix=state_prefix) self._spatial_pool = nn_layers.SpatialAveragePool3D(keepdims=True) self._pos_encoding = None if use_positional_encoding: self._pos_encoding = nn_layers.PositionalEncoding( initializer='zeros', state_prefix=state_prefix)
def get_config(self):
"""Returns a dictionary containing the config used for initialization."""
config = {
'hidden_filters': self._hidden_filters,
'se_type': self._se_type,
'activation': self._activation,
'gating_activation': self._gating_activation,
'causal': self._causal,
'conv_type': self._conv_type,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'use_positional_encoding': self._use_positional_encoding,
'state_prefix': self._state_prefix,
}
base_config = super(StreamSqueezeExcitation, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
"""Builds the layer with the given input shape."""
self._se_reduce = ConvBlock(
filters=self._hidden_filters,
kernel_size=1,
causal=self._causal,
use_bias=True,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
use_batch_norm=False,
activation=self._activation,
conv_type=self._conv_type,
name='se_reduce')
textself._se_expand = ConvBlock( filters=input_shape[-1], kernel_size=1, causal=self._causal, use_bias=True, kernel_initializer=self._kernel_initializer, kernel_regularizer=self._kernel_regularizer, use_batch_norm=False, activation=self._gating_activation, conv_type=self._conv_type, name='se_expand') super(StreamSqueezeExcitation, self).build(input_shape)
def call(self,
inputs: tf.Tensor,
states: Optional[nn_layers.States] = None
) -> Tuple[tf.Tensor, nn_layers.States]:
"""Calls the layer with the given inputs.
textArgs: inputs: the input tensor. states: a dict of states such that, if any of the keys match for this layer, will overwrite the contents of the buffer(s). Returns: the output tensor and states """ states = dict(states) if states is not None else {} if self._se_type == '3d': x, states = self._spatiotemporal_pool( inputs, states=states, output_states=True) elif self._se_type == '2d': x = self._spatial_pool(inputs) elif self._se_type == '2plus3d': x_space = self._spatial_pool(inputs) x, states = self._spatiotemporal_pool( x_space, states=states, output_states=True) if not self._causal: x = tf.tile(x, [1, tf.shape(inputs)[1], 1, 1, 1]) # print("x Shape:", x.shape, "x_space Shape:", x_space.shape) x = tf.concat([x, x_space], axis=-1) # print("x Shape:", x.shape, "x_space Shape:", x_space.shape) else: raise ValueError('Unknown Squeeze Excitation type {}'.format( self._se_type)) if self._pos_encoding is not None: x, states = self._pos_encoding(x, states=states) x = self._se_reduce(x) x = self._se_expand(x) return x * inputs, states
@tf_keras.utils.register_keras_serializable(package='Vision')
class CausalSimAM(tf_keras.layers.Layer):
def init(self, e_lambda=1e-4, **kwargs):
super(CausalSimAM, self).init(**kwargs)
self.e_lambda = e_lambda
self.sigmoid = tf_keras.activations.sigmoid
textdef call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值,仅基于当前帧 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) # 应用Sigmoid激活 activation = self.sigmoid(y) # 形状: (batch, time, height, width, channels) # 输出 return inputs * activation # 形状: (batch, time, height, width, channels) def get_config(self): config = super(CausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda }) return config
是一样的效果吗?
两个代码片段分别实现了SimAM(Simple Attention Module)注意力机制,但它们使用了不同的深度学习框架(PyTorch和TensorFlow)并且适用于不同的输入数据格式。尽管在核心思想和数学运算上具有相似性,但在具体实现细节和适用场景上存在一些关键差异。以下是对这两个实现的详细对比和分析:
simam_module
(batch, channels, height, width)
CausalSimAM
(batch, time, height, width, channels)
两者的核心思想都是基于每个神经元的激活值相对于其局部均值和方差的“重要性”来调整其权重,具体步骤如下:
计算均值:
x.mean(dim=[2,3], keepdim=True)
tf.reduce_mean(inputs, axis=[2, 3], keepdims=True)
计算平方差:
(x - mu).pow(2)
tf.square(inputs - mu)
计算能量函数 :
x_minus_mu_square / (4 * (x_minus_mu_square.sum(dim=[2,3], keepdim=True) / n + self.e_lambda)) + 0.5
x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5
应用 Sigmoid 激活:
self.activaton(y)
(即 Sigmoid(y)
)self.sigmoid(y)
调整输入:
x * self.activaton(y)
inputs * activation
simam_module
:仅处理空间维度(高度和宽度),适用于静态图像。CausalSimAM
:额外处理时间维度,适用于视频数据。它确保注意力机制的计算仅基于当前帧及之前的帧,避免了未来信息的泄露,这对于实时或流式推理(Streaming Inference)尤为重要。PyTorch 实现:
nn.Module
,直接对输入张量进行操作。TensorFlow 实现:
CausalSimAM
被集成到更复杂的模型结构中,如 MoViNet 的基本块(MovinetBlock
)。ConvBlock
, StreamConvBlock
等)的协同工作,支持时序数据的处理和流式推理。simam_module
pythonclass simam_module(torch.nn.Module): def __init__(self, channels = None, e_lambda = 1e-4): super(simam_module, self).__init__() self.activaton = nn.Sigmoid() self.e_lambda = e_lambda def forward(self, x): b, c, h, w = x.size() n = w * h - 1 x_minus_mu_square = (x - x.mean(dim=[2,3], keepdim=True)).pow(2) y = x_minus_mu_square / (4 * (x_minus_mu_square.sum(dim=[2,3], keepdim=True) / n + self.e_lambda)) + 0.5 return x * self.activaton(y)
CausalSimAM
pythonclass CausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(CausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf_keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值,仅基于当前帧 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(inputs - mu) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 应用Sigmoid激活 activation = self.sigmoid(y) # 输出 return inputs * activation def get_config(self): config = super(CausalSimAM, self).get_config() config.update({'e_lambda': self.e_lambda}) return config
StreamConvBlock
, ConvBlock
等)协同工作,以适应复杂的模型架构,如 MoViNet。两者在处理各自适用的数据格式和任务上都能实现相似的注意力增强效果。然而,由于输入数据格式和应用场景的不同,它们在实现细节和功能上存在显著差异。因此,可以认为它们在各自的框架和应用场景中实现了相同的核心效果,但具体的实现方式和适用范围有所不同。
如果您的目标是在 MoViNet(TensorFlow)中集成 SimAM,并确保其与因果卷积和流式推理兼容,您需要:
CausalSimAM
正确处理 5D 张量,并与 MoViNet 的时序数据流无缝集成。CausalSimAM
在时间维度上的因果性,确保其不会引入未来帧的信息。通过以上分析,可以确认这两个实现虽然在框架和具体应用上有所不同,但在核心功能上是相似的。根据您的需求,可以在 TensorFlow 中进一步优化 CausalSimAM
,以充分发挥其在时序数据处理中的优势。
TensorFlow 实现 因果性和时间操作体现在哪?
在TensorFlow实现的CausalSimAM
中,因果性(Causality)和时间操作(Temporal Operations)通过多个组件和设计策略得以体现和实现。以下是详细的解释,说明这些特性如何在代码中具体体现:
因果性确保模型在处理当前帧时,仅依赖于当前及之前的帧,而不涉及未来帧的信息。这对于实时或流式推理(Streaming Inference)至关重要,避免了信息泄露和模型的预测不一致性。
StreamBuffer
类StreamBuffer
类负责缓存之前的帧,以便在处理当前帧时能够访问到必要的历史信息,但不会包含未来帧。这通过以下方式实现:
StreamBuffer
缓存了固定数量的之前帧(由buffer_size
决定),并在每次调用时更新缓冲区,仅保留最新的buffer_size
帧。states
字典存储和更新缓冲区状态,确保每次推理只依赖于当前及过去的帧。pythonclass StreamBuffer(tf_keras.layers.Layer): def __init__(self, buffer_size: int, state_prefix: Optional[str] = None, **kwargs): super(StreamBuffer, self).__init__(**kwargs) self._buffer_size = buffer_size self._state_prefix = state_prefix if state_prefix is not None else '' self._state_name = f'{state_prefix}_stream_buffer' def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[Any, nn_layers.States]: states = dict(states) if states is not None else {} buffer = states.get(self._state_name, None) if buffer is None: shape = tf.shape(inputs) buffer = tf.zeros([shape[0], self._buffer_size, shape[2], shape[3], shape[4]], dtype=inputs.dtype) full_inputs = tf.concat([buffer, inputs], axis=1) new_buffer = full_inputs[:, -self._buffer_size:] states[self._state_name] = new_buffer return full_inputs, states
StreamConvBlock
类StreamConvBlock
继承自ConvBlock
,并集成了StreamBuffer
以管理时间维度上的因果性。
causal=True
时,ConvBlock
使用 'causal'
填充方式,仅在时间维度上使用过去的信息。StreamBuffer
,StreamConvBlock
在每次调用时仅使用缓冲区中的过去帧和当前帧,避免未来帧的依赖。pythonclass StreamConvBlock(ConvBlock): def __init__(self, ..., causal: bool = False, ...): super(StreamConvBlock, self).__init__(..., causal=causal, ...) buffer_size = kernel_size[0] - 1 use_buffer = buffer_size > 0 and causal if use_buffer: self._stream_buffer = StreamBuffer(buffer_size=buffer_size, state_prefix=state_prefix) def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: states = dict(states) if states is not None else {} x = inputs if self._conv_temporal is None and self._stream_buffer is not None: x, states = self._stream_buffer(x, states=states) # 后续的卷积、归一化和激活操作 return x, states
CausalSimAM
类CausalSimAM
类本身设计为处理带有时间维度的输入,并确保注意力权重的计算仅基于当前帧的信息。
CausalSimAM
逐帧处理输入,每一时间步的计算仅依赖于该时间步的空间信息。CausalSimAM
不会引入对未来帧的依赖,从而保持因果性。pythonclass CausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(CausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf_keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值,仅基于当前帧 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) # 应用Sigmoid激活 activation = self.sigmoid(y) # 形状: (batch, time, height, width, channels) # 输出 return inputs * activation # 形状: (batch, time, height, width, channels) def get_config(self): config = super(CausalSimAM, self).get_config() config.update({'e_lambda': self.e_lambda}) return config
时间操作涉及处理序列数据中的时间维度,确保模型能够捕捉到时序信息,同时保持因果性。
MobileConv2D
类MobileConv2D
类扩展了常规的卷积操作,以支持移动设备上的高效计算,并能够处理时间维度上的卷积操作。
use_temporal
参数,MobileConv2D
能够在时间维度上执行卷积操作,支持时序特征的提取。use_buffered_input
参数允许层使用缓冲区中的输入,模拟因果填充,从而在时间维度上保持因果性。pythonclass MobileConv2D(tf_keras.layers.Layer): def __init__(self, ..., use_temporal: bool = False, use_buffered_input: bool = False, ..., **kwargs): super(MobileConv2D, self).__init__(**kwargs) # 参数初始化和卷积层配置 # ... def call(self, inputs): if self._use_temporal: input_shape = [tf.shape(inputs)[0], tf.shape(inputs)[1], tf.shape(inputs)[2] * tf.shape(inputs)[3], inputs.shape[4]] else: input_shape = [tf.shape(inputs)[0] * tf.shape(inputs)[1], tf.shape(inputs)[2], tf.shape(inputs)[3], inputs.shape[4]] x = tf.reshape(inputs, input_shape) x = self._conv(x) # 后续的批归一化和激活 # ... return x
ConvBlock
和 StreamConvBlock
类这些类负责将卷积操作与时间维度的处理结合起来,确保模型能够有效地处理时序数据。
conv_type
参数,支持不同类型的卷积操作(如3d
, 2plus1d
, 3d_2plus1d
),以适应不同的时序特征提取需求。causal=True
时,使用因果填充方式,确保卷积操作不依赖于未来帧的信息。pythonclass ConvBlock(tf_keras.layers.Layer): def __init__(self, ..., causal: bool = False, ..., **kwargs): super(ConvBlock, self).__init__(**kwargs) self._causal = causal # 参数初始化和卷积层配置 # ... def build(self, input_shape): padding = 'causal' if self._causal else 'same' # 构建卷积层和批归一化层 # ... def call(self, inputs): x = self._conv(inputs) if self._batch_norm is not None and self._conv_type != '2plus1d': x = self._batch_norm(x) if self._activation_layer is not None and self._conv_type != '2plus1d': x = self._activation_layer(x) # 后续的卷积和归一化 # ... return x
StreamConvBlock
类的集成StreamConvBlock
类通过集成StreamBuffer
和ConvBlock
,在整个模型中维持因果性和时间操作的正确性。
StreamBuffer
,每个StreamConvBlock
只处理当前及过去的帧,确保卷积操作的因果性。StreamConvBlock
被多次使用,逐层提取时序特征,同时保持因果性。pythonclass StreamConvBlock(ConvBlock): def __init__(self, ..., causal: bool = False, ..., **kwargs): super(StreamConvBlock, self).__init__(..., causal=causal, ...) # 其他初始化 if use_buffer: self._stream_buffer = StreamBuffer(buffer_size=buffer_size, state_prefix=state_prefix) def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: states = dict(states) if states is not None else {} x = inputs if self._conv_temporal is None and self._stream_buffer is not None: x, states = self._stream_buffer(x, states=states) # 进行卷积、归一化和激活 return x, states
CausalSimAM
的集成CausalSimAM
被集成到模型的特定位置(例如,每个MovinetBlock
的MobileBottleneck
之后),以增强时空特征的表示,同时保持因果性。
pythonclass MobileBottleneck(tf_keras.layers.Layer): def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: # 进行扩展、特征提取和投影 x = self._projection_layer(x) # 添加SimAM simam = CausalSimAM() x = simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states
在TensorFlow实现的CausalSimAM
中,因果性和时间操作主要通过以下几个方面体现:
StreamBuffer
:缓存过去的帧,确保模型仅依赖于当前及过去的信息。StreamConvBlock
:集成StreamBuffer
和因果填充的卷积操作,保持因果性。CausalSimAM
:在每个时间步独立计算注意力权重,避免未来帧的信息泄露。'causal'
填充方式,确保卷积操作不依赖于未来帧。通过这些设计和实现,TensorFlow版本的CausalSimAM
不仅增强了特征表示能力,还确保了模型在处理时序数据时的实时性和因果性,非常适合应用于需要在线推理和实时处理的任务,如视频分类、行为识别等。
CausalSimAM中哪里体现了因果性?你再检查一下,讲一下和原版SimAM到底区别在哪
class simam_module(torch.nn.Module):
def init(self, channels = None, e_lambda = 1e-4):
super(simam_module, self).init()
self.activaton = nn.Sigmoid()
self.e_lambda = e_lambda
textdef forward(self, x): b, c, h, w = x.size() n = w * h - 1 x_minus_mu_square = (x - x.mean(dim=[2,3], keepdim=True)).pow(2) y = x_minus_mu_square / (4 * (x_minus_mu_square.sum(dim=[2,3], keepdim=True) / n + self.e_lambda)) + 0.5 return x * self.activaton(y)
您提出的问题非常关键,涉及到如何在 TensorFlow 实现的 CausalSimAM
中体现因果性(Causality),以及它与原始的 PyTorch 实现 simam_module
之间的具体区别。以下是详细的分析和解释:
CausalSimAM
中的体现因果性(Causality) 在时序数据处理中,指的是当前时刻的预测或处理仅依赖于当前及过去的输入信息,而不依赖于未来的信息。这在实时或流式推理(Streaming Inference)中尤为重要,确保模型不会“窥视”未来的数据,从而保持推理的实时性和一致性。
CausalSimAM
类的实现分析让我们先回顾一下 CausalSimAM
的实现:
python@tf_keras.utils.register_keras_serializable(package='Vision') class CausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(CausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf_keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) # 计算每个时间步和每个通道的均值,仅基于当前帧 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) # 计算能量函数 y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) # 应用Sigmoid激活 activation = self.sigmoid(y) # 形状: (batch, time, height, width, channels) # 输出 return inputs * activation # 形状: (batch, time, height, width, channels) def get_config(self): config = super(CausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda }) return config
在 CausalSimAM
中,因果性的实现并不是通过层本身直接控制,而是通过以下几个方面在整体模型架构中间接实现的:
时间维度的处理:
CausalSimAM
接受的是 5D 张量 (batch, time, height, width, channels)
,其中 time
维度表示时间序列。CausalSimAM
层中,每个时间步(即每个 time
的索引)被独立处理。具体来说,mu
和 x_mu_square
的计算都是基于当前时间步的空间维度(height
和 width
),不涉及其他时间步的信息。模型架构中的缓冲机制:
StreamBuffer
类来缓存过去的帧,并在 StreamConvBlock
中确保当前帧的处理仅依赖于当前及过去的帧。CausalSimAM
作为一个注意力机制层,嵌入在这种保证因果性的架构中。它处理的每一帧已经通过缓冲机制确保不会包含未来帧的信息。卷积层的因果填充:
ConvBlock
和 StreamConvBlock
中,使用了 'causal'
填充方式(padding='causal'
),确保卷积操作在时间维度上不会依赖未来帧。CausalSimAM
层接收到的输入也是因果的。在您的代码中,因果性的具体体现主要在于:
StreamBuffer
类:负责缓存过去的帧,并在每次前向传播时仅传递当前及过去的帧。
pythonclass StreamBuffer(tf_keras.layers.Layer): def __init__(self, buffer_size: int, state_prefix: Optional[str] = None, **kwargs): super(StreamBuffer, self).__init__(**kwargs) self._buffer_size = buffer_size self._state_prefix = state_prefix if state_prefix is not None else '' self._state_name = f'{state_prefix}_stream_buffer' def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[Any, nn_layers.States]: states = dict(states) if states is not None else {} buffer = states.get(self._state_name, None) if buffer is None: shape = tf.shape(inputs) buffer = tf.zeros([shape[0], self._buffer_size, shape[2], shape[3], shape[4]], dtype=inputs.dtype) full_inputs = tf.concat([buffer, inputs], axis=1) new_buffer = full_inputs[:, -self._buffer_size:] states[self._state_name] = new_buffer return full_inputs, states
StreamConvBlock
类:通过集成 StreamBuffer
,确保每个卷积块处理的输入仅包含当前及过去的帧。
pythonclass StreamConvBlock(ConvBlock): def __init__(self, ..., causal: bool = False, ..., **kwargs): super(StreamConvBlock, self).__init__(..., causal=causal, ...) buffer_size = kernel_size[0] - 1 use_buffer = buffer_size > 0 and causal if use_buffer: self._stream_buffer = StreamBuffer(buffer_size=buffer_size, state_prefix=state_prefix) def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: states = dict(states) if states is not None else {} x = inputs if self._conv_temporal is None and self._stream_buffer is not None: x, states = self._stream_buffer(x, states=states) # 进行卷积、归一化和激活操作 return x, states
CausalSimAM
的调用:在 MobileBottleneck
类中调用 CausalSimAM
,确保它处理的是已经通过因果性保障的输入。
pythonclass MobileBottleneck(tf_keras.layers.Layer): def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: # 进行扩展、特征提取和投影 x = self._projection_layer(x) # 添加SimAM simam = CausalSimAM() x = simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states
通过上述机制,CausalSimAM
处理的每一帧输入都是因果的,即仅依赖于当前及之前的帧。
CausalSimAM
与原版 SimAM
的区别PyTorch 实现的 simam_module
:
(batch, channels, height, width)
TensorFlow 实现的 CausalSimAM
:
(batch, time, height, width, channels)
时间维度处理:
simam_module
:仅处理空间维度,适用于静态图像。CausalSimAM
:处理时间维度,通过处理 5D 张量,适用于视频序列。因果性保障:
simam_module
:不涉及因果性,因为它主要用于静态图像。CausalSimAM
:通过与 StreamBuffer
和 StreamConvBlock
等组件的集成,间接保证了因果性。但在 CausalSimAM
层本身,并未直接实现因果性,而是依赖于模型架构的设计来确保处理的输入数据是因果的。集成方式:
simam_module
可以直接插入到卷积神经网络的任意位置,用于增强特征表示。CausalSimAM
通常与其他时序处理模块(如 StreamConvBlock
、StreamBuffer
等)协同工作,集成在更复杂的时序模型架构中,如 MoViNet。状态管理:
simam_module
:不涉及状态管理,适用于独立的静态图像处理。CausalSimAM
:通过 StreamBuffer
等机制管理状态,适用于连续帧的流式处理。尽管框架和适用场景不同,CausalSimAM
与原版 simam_module
在核心数学运算上是相似的:
计算均值:
x.mean(dim=[2,3], keepdim=True)
tf.reduce_mean(inputs, axis=[2, 3], keepdims=True)
计算平方差:
(x - mu).pow(2)
tf.square(inputs - mu)
计算能量函数 :
x_minus_mu_square / (4 * (x_minus_mu_square.sum(dim=[2,3], keepdim=True) / n + self.e_lambda)) + 0.5
x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5
应用 Sigmoid 激活:
self.activaton(y)
(即 Sigmoid(y)
)self.sigmoid(y)
调整输入:
x * self.activaton(y)
inputs * activation
输入维度:
simam_module
处理 4D 张量,适用于静态图像。CausalSimAM
处理 5D 张量,适用于视频序列。因果性保障:
simam_module
不涉及因果性。CausalSimAM
通过与模型架构(如 StreamBuffer
、StreamConvBlock
)的集成,确保处理的输入数据是因果的,但 CausalSimAM
层本身不直接实现因果性。集成复杂度:
simam_module
可直接插入简单的卷积网络。CausalSimAM
需要与其他时序处理模块协同工作,适用于复杂的时序模型架构。CausalSimAM
中的实现在 CausalSimAM
类本身,并未直接通过层内操作来保障因果性(如时间步之间的依赖关系)。因果性是通过以下方式在整体模型架构中间接实现的:
CausalSimAM
逐帧处理,每个时间步的计算仅基于该时间步的空间信息。StreamBuffer
和 StreamConvBlock
等模块,确保每次前向传播时输入仅包含当前及过去的帧。适用范围:
simam_module
:静态图像。CausalSimAM
:时序视频数据。框架差异:
因果性保障:
simam_module
无因果性保障。CausalSimAM
通过模型架构间接保障因果性。输入维度:
simam_module
:4D。CausalSimAM
:5D。明确因果性保障:
CausalSimAM
层本身更加明确地保障因果性,可以在层内引入时间步的依赖管理。例如,使用掩码(mask)或限定时间步的信息流。层内时间依赖:
CausalSimAM
中加入时间步间的依赖,例如通过递归机制或时间步的历史信息整合,进一步提升因果性保障。测试与验证:
CausalSimAM
后,进行严格的因果性测试,确保模型在推理时不依赖于未来帧的信息。这可以通过分析中间特征或可视化注意力权重来验证。文档与注释:
CausalSimAM
和相关模块添加详细的注释和文档,明确因果性保障的实现机制,便于维护和扩展。通过上述分析,可以更清晰地理解 CausalSimAM
的实现方式及其与原版 SimAM
的区别,从而在实际应用中更有效地利用这些模块来提升模型的性能和特性。
CausalSimAM效果一般,你能在这个模块添加一些什么新的想法来实现简单高效的效果吗?
理解 CausalSimAM
在当前实现中效果一般,确实需要通过一些创新性的改进来提升其性能,同时保持模块的简单性和高效性。以下是一些具体的建议和新的想法,可以在 CausalSimAM
模块中进行集成,以实现更好的效果:
在原始的 SimAM 中,注意力机制主要关注空间维度。为了进一步增强特征表达,可以引入通道注意力,动态调整每个通道的重要性。
保持 SimAM 中的空间注意力机制不变,或者进一步优化其计算方式,使其更好地捕捉关键区域。
将通道注意力与空间注意力结合起来,可以更全面地增强特征表示。
python@tf_keras.utils.register_keras_serializable(package='Vision') class EnhancedCausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, reduction_ratio=16, **kwargs): super(EnhancedCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf_keras.activations.sigmoid self.reduction_ratio = reduction_ratio self.global_avg_pool = tf.keras.layers.GlobalAveragePooling3D() self.fc1 = tf.keras.layers.Dense(units=self.filters // self.reduction_ratio, activation='relu', use_bias=False) self.fc2 = tf.keras.layers.Dense(units=self.filters, activation='sigmoid', use_bias=False) def build(self, input_shape): self.filters = input_shape[-1] super(EnhancedCausalSimAM, self).build(input_shape) def call(self, inputs: tf.Tensor) -> tf.Tensor: # 原始 SimAM 计算 batch, time, height, width, channels = tf.unstack(tf.shape(inputs)) n = tf.cast(height * width - 1, dtype=inputs.dtype) mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(inputs - mu) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 spatial_attention = self.sigmoid(y) # 通道注意力 channel_attention = self.global_avg_pool(inputs) # (batch, time, channels) channel_attention = self.fc1(channel_attention) channel_attention = self.fc2(channel_attention) channel_attention = tf.expand_dims(tf.expand_dims(channel_attention, 1), 1) # (batch, 1, 1, 1, channels) # 结合空间和通道注意力 combined_attention = spatial_attention * channel_attention return inputs * combined_attention def get_config(self): config = super(EnhancedCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'reduction_ratio': self.reduction_ratio, }) return config
说明:
引入多尺度特征处理,使得注意力机制能够在不同尺度上捕捉特征的重要性。例如,可以在不同分辨率下应用 SimAM,然后将结果进行融合。
在 CausalSimAM
模块中增加多尺度处理分支,分别处理不同尺寸的特征图,并将注意力权重进行融合。
python@tf_keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1, 0.5], **kwargs): super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: attentions = [] for scale in self.scales: if scale != 1: scaled_inputs = tf.image.resize(inputs, size=[int(tf.shape(inputs)[2] * scale), int(tf.shape(inputs)[3] * scale)], method='bilinear') else: scaled_inputs = inputs mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(scaled_inputs - mu) n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=scaled_inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) if scale != 1: attention = tf.image.resize(attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') attentions.append(attention) combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config
说明:
在 CausalSimAM
模块中加入残差连接,可以帮助缓解梯度消失问题,促进更深层次的特征学习,同时保持模块的稳定性。
将输入特征与经过注意力机制调整后的特征相加,形成残差连接。
python@tf_keras.utils.register_keras_serializable(package='Vision') class ResidualCausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(ResidualCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 原始 SimAM 计算 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(inputs - mu) n = tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) # 残差连接 return inputs + (inputs * attention) def get_config(self): config = super(ResidualCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, }) return config
说明:
e_lambda
参数在不同的层或不同的时间步动态调整 e_lambda
,可以让模型在训练过程中自适应地调整注意力权重的平滑程度,从而提升性能。
将 e_lambda
设置为一个可学习的参数,或者根据输入特征动态计算。
python@tf_keras.utils.register_keras_serializable(package='Vision') class DynamicCausalSimAM(tf_keras.layers.Layer): def __init__(self, initial_e_lambda=1e-4, **kwargs): super(DynamicCausalSimAM, self).__init__(**kwargs) self.initial_e_lambda = initial_e_lambda self.sigmoid = tf.keras.activations.sigmoid def build(self, input_shape): # 将 e_lambda 作为可学习参数 self.e_lambda = self.add_weight( name='e_lambda', shape=(), initializer=tf.keras.initializers.Constant(self.initial_e_lambda), trainable=True ) super(DynamicCausalSimAM, self).build(input_shape) def call(self, inputs: tf.Tensor) -> tf.Tensor: mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(inputs - mu) n = tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) return inputs * attention def get_config(self): config = super(DynamicCausalSimAM, self).get_config() config.update({ 'initial_e_lambda': self.initial_e_lambda, }) return config
说明:
e_lambda
作为一个可学习参数,可以在训练过程中自适应地调整其值,以更好地适应不同层或不同时间步的需求。注意力模块的集成位置对性能影响显著。可以尝试在不同的网络层级(如初始层、中间层或末层)集成 CausalSimAM
,并通过实验确定最佳位置。
在多个层级集成注意力模块,可以逐层增强特征表示,但需要权衡计算开销和性能提升。
pythonclass MobileBottleneck(tf_keras.layers.Layer): def __init__(self, ..., use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) # 其他初始化 self.use_simam = use_simam if self.use_simam: self.simam = EnhancedCausalSimAM(**simam_params) if simam_params else CausalSimAM() def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: # 扩展、特征提取和投影 x = self._projection_layer(x) # 添加SimAM if self.use_simam: x = self.simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states
说明:
通过使用分组卷积或深度可分离卷积,可以减少计算量,同时保持或提升模型性能。
优化注意力权重的计算过程,减少不必要的计算步骤,提升效率。
python@tf_keras.utils.register_keras_serializable(package='Vision') class EfficientCausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, **kwargs): super(EfficientCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: # 仅计算必要的部分,减少计算量 mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu = inputs - mu x_mu_square = tf.square(x_mu) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / (tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype)) + self.e_lambda)) + 0.5 attention = self.sigmoid(y) return inputs * attention def get_config(self): config = super(EfficientCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, }) return config
说明:
在注意力权重计算过程中引入正则化,可以防止过拟合,提升模型的泛化能力。
在计算注意力权重的过程中,添加 L1 或 L2 正则化项,或者应用 Dropout 等正则化技术。
python@tf_keras.utils.register_keras_serializable(package='Vision') class RegularizedCausalSimAM(tf_keras.layers.Layer): def __init__(self, e_lambda=1e-4, dropout_rate=0.1, **kwargs): super(RegularizedCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.sigmoid = tf.keras.activations.sigmoid self.dropout = tf.keras.layers.Dropout(dropout_rate) def call(self, inputs: tf.Tensor, training=False) -> tf.Tensor: mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(inputs - mu) n = tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) attention = self.dropout(attention, training=training) return inputs * attention def get_config(self): config = super(RegularizedCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'dropout_rate': self.dropout.rate, }) return config
说明:
在集成上述改进后,务必通过实验验证每个改进点的有效性。以下是一些建议的步骤:
单独测试每个改进:
组合多项改进:
性能与效率权衡:
可视化与分析:
通过上述改进,可以显著提升 CausalSimAM
模块的性能和效果,同时保持其简单和高效的特性。具体改进点包括:
这些改进措施可以帮助您在保持模型简单和高效的前提下,显著提升 CausalSimAM
的效果,从而更好地支持 MoViNet 模型在视频分析任务中的性能。
DynamicCausalSimAM怎么使用?
DynamicCausalSimAM
是在原有的 CausalSimAM
基础上,通过将 e_lambda
参数设为可学习的参数,增强了注意力机制的灵活性和适应性。这样,模型可以在训练过程中根据数据自适应地调整注意力权重的平滑程度,从而提升性能。下面将详细介绍如何在 TensorFlow 中使用 DynamicCausalSimAM
,包括其定义、集成到模型中的步骤以及示例代码。
DynamicCausalSimAM
的定义首先,确保您已经定义了 DynamicCausalSimAM
类。以下是其实现:
python@tf_keras.utils.register_keras_serializable(package='Vision') class DynamicCausalSimAM(tf_keras.layers.Layer): def __init__(self, initial_e_lambda=1e-4, **kwargs): super(DynamicCausalSimAM, self).__init__(**kwargs) self.initial_e_lambda = initial_e_lambda self.sigmoid = tf.keras.activations.sigmoid def build(self, input_shape): # 将 e_lambda 作为可学习参数 self.e_lambda = self.add_weight( name='e_lambda', shape=(), initializer=tf.keras.initializers.Constant(self.initial_e_lambda), trainable=True ) super(DynamicCausalSimAM, self).build(input_shape) def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # 形状: (batch, time, height, width, channels) n = tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # 形状: (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 形状: (batch, time, height, width, channels) attention = self.sigmoid(y) # 形状: (batch, time, height, width, channels) return inputs * attention # 形状: (batch, time, height, width, channels) def get_config(self): config = super(DynamicCausalSimAM, self).get_config() config.update({ 'initial_e_lambda': self.initial_e_lambda, }) return config
说明:
e_lambda
现在是一个可学习的参数,通过 self.add_weight
实现,使其在训练过程中自动调整。DynamicCausalSimAM
仍然处理 5D 张量 (batch, time, height, width, channels)
,确保每个时间步的计算仅基于当前及过去的帧。DynamicCausalSimAM
假设您正在使用 MoViNet 或类似的模型架构,并希望在某个特定的位置(例如,每个 MobileBottleneck
之后)集成 DynamicCausalSimAM
。以下是具体的步骤和示例代码。
MobileBottleneck
类在 MobileBottleneck
类中,添加 DynamicCausalSimAM
层。以下是一个示例:
python@tf_keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf_keras.layers.Layer): def __init__(self, ..., use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) # 其他初始化代码 self.use_simam = use_simam if self.use_simam: # 使用 DynamicCausalSimAM self.simam = DynamicCausalSimAM(**simam_params) if simam_params else DynamicCausalSimAM() def call(self, inputs: tf.Tensor, states: Optional[nn_layers.States] = None) -> Tuple[tf.Tensor, nn_layers.States]: # 扩展、特征提取和投影 x = self._projection_layer(inputs) # 添加 DynamicCausalSimAM if self.use_simam: x = self.simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, }) return config
说明:
use_simam
参数:允许您控制是否在该层集成 SimAM 模块,增强了模块的灵活性。DynamicCausalSimAM
:在投影层之后,插入 DynamicCausalSimAM
层,以增强特征表示。确保您的模型架构中,所有需要集成 DynamicCausalSimAM
的层都已更新。例如,如果在多个 MobileBottleneck
中集成该模块,可以在每个相关的层中进行类似的修改。
DynamicCausalSimAM
到 MoViNet 模型以下是一个简化的示例,展示如何在 MoViNet 模型中集成 DynamicCausalSimAM
。
pythonimport tensorflow as tf from tensorflow.keras import layers # 假设已有的 MobileBottleneck 类已经包含了 DynamicCausalSimAM # 以及其他必要的组件 class MoViNetModel(tf.keras.Model): def __init__(self, ..., **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义模型的其他层 self.stem = ... # Stem 层 self.blocks = [ MobileBottleneck(..., use_simam=True, simam_params={'initial_e_lambda': 1e-4}), MobileBottleneck(..., use_simam=True, simam_params={'initial_e_lambda': 1e-4}), # 添加更多的 MobileBottleneck 层 ] self.head = ... # Head 层 def call(self, inputs, training=False): x, states = self.stem(inputs, states) for block in self.blocks: x, states = block(x, states, training=training) x = self.head(x) return x # 创建模型实例 model = MoViNetModel(...) # 编译模型 model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # 训练模型 model.fit(train_dataset, epochs=10, validation_data=val_dataset)
说明:
MobileBottleneck
的使用:在定义 blocks
时,确保每个 MobileBottleneck
实例化时 use_simam=True
,并传入必要的参数。e_lambda
的选择e_lambda
的初始值对模型性能有一定影响。可以尝试不同的初始值,例如 1e-4
、1e-3
等,并通过验证集选择最佳值。
由于 e_lambda
现在是可学习参数,适当调整学习率可能有助于模型更好地学习注意力权重。
考虑在损失函数中加入适当的正则化项,防止 e_lambda
过度偏离初始值,或者通过 Dropout 等方法增强模型的泛化能力。
进行对比实验,比较使用 DynamicCausalSimAM
与不使用注意力机制时的模型性能,包括准确率、损失值等指标。
通过可视化注意力权重,分析 DynamicCausalSimAM
是否有效地增强了关键区域的特征表示。
确保模型在推理时没有依赖未来帧的信息。可以通过以下方法验证:
以下是一个完整的示例代码,展示如何定义和使用 DynamicCausalSimAM
,并集成到 MobileBottleneck
中。
pythonimport tensorflow as tf from tensorflow.keras import layers @tf_keras.utils.register_keras_serializable(package='Vision') class DynamicCausalSimAM(tf.keras.layers.Layer): def __init__(self, initial_e_lambda=1e-4, **kwargs): super(DynamicCausalSimAM, self).__init__(**kwargs) self.initial_e_lambda = initial_e_lambda self.sigmoid = tf.keras.activations.sigmoid def build(self, input_shape): # 将 e_lambda 作为可学习参数 self.e_lambda = self.add_weight( name='e_lambda', shape=(), initializer=tf.keras.initializers.Constant(self.initial_e_lambda), trainable=True ) super(DynamicCausalSimAM, self).build(input_shape) def call(self, inputs: tf.Tensor) -> tf.Tensor: # 输入形状: (batch, time, height, width, channels) mu = tf.reduce_mean(inputs, axis=[2, 3], keepdims=True) # (batch, time, 1, 1, channels) x_mu_square = tf.square(inputs - mu) # (batch, time, height, width, channels) n = tf.cast(tf.shape(inputs)[2] * tf.shape(inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) # (batch, time, 1, 1, channels) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # (batch, time, height, width, channels) attention = self.sigmoid(y) # (batch, time, height, width, channels) return inputs * attention # (batch, time, height, width, channels) def get_config(self): config = super(DynamicCausalSimAM, self).get_config() config.update({ 'initial_e_lambda': self.initial_e_lambda, }) return config @tf_keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, ..., use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) # 其他初始化代码 self.use_simam = use_simam if self.use_simam: # 使用 DynamicCausalSimAM self.simam = DynamicCausalSimAM(**simam_params) if simam_params else DynamicCausalSimAM() def build(self, input_shape): # 构建扩展层、特征提取层、投影层等 # 例如: self._projection_layer = layers.Conv3D(...) def call(self, inputs: tf.Tensor, states: Optional[dict] = None, training=False) -> Tuple[tf.Tensor, dict]: # 扩展、特征提取和投影 x = self._projection_layer(inputs) # 添加 DynamicCausalSimAM if self.use_simam: x = self.simam(x) # 处理残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, }) return config # 定义 MoViNet 模型 class MoViNetModel(tf.keras.Model): def __init__(self, ..., **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义 Stem 层 self.stem = layers.Conv3D(...) # 定义 MobileBottleneck 层 self.blocks = [ MobileBottleneck(..., use_simam=True, simam_params={'initial_e_lambda': 1e-4}), MobileBottleneck(..., use_simam=True, simam_params={'initial_e_lambda': 1e-4}), # 添加更多的 MobileBottleneck 层 ] # 定义 Head 层 self.head = layers.Dense(...) def call(self, inputs, training=False): x = self.stem(inputs, training=training) states = {} for block in self.blocks: x, states = block(x, states, training=training) x = self.head(x) return x # 创建模型实例 model = MoViNetModel(...) # 编译模型 model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # 训练模型 model.fit(train_dataset, epochs=10, validation_data=val_dataset)
说明:
DynamicCausalSimAM
的实例化:在 MobileBottleneck
中实例化 DynamicCausalSimAM
,并传入必要的参数(如 initial_e_lambda
)。由于 e_lambda
是可学习参数,可能需要适当的学习率和初始化策略,确保其在训练过程中稳定收敛。
选择合适的 initial_e_lambda
值对模型性能有重要影响。可以尝试多种初始值,通过验证集选择最佳值。
尽管 DynamicCausalSimAM
增加了少量的参数和计算开销,但在大规模模型中应监控总体复杂度,确保模型在资源受限的环境中依然高效运行。
根据具体任务和数据集,可能需要调整 reduction_ratio
(如果有)、学习率、正则化强度等超参数,以获得最佳性能。
通过将 DynamicCausalSimAM
集成到模型的关键位置,您可以增强模型的特征表示能力,同时保持因果性和实时推理的特性。以下是关键步骤的总结:
DynamicCausalSimAM
:确保其作为一个可学习的注意力机制层。MobileBottleneck
之后)插入该层。通过这些步骤,您可以有效地利用 DynamicCausalSimAM
提升 MoViNet 模型在视频分析任务中的表现。
在您的项目中,通过在 CausalSimAM
模块中引入多尺度特征融合(Multi-Scale Feature Fusion),可以进一步增强模型对不同空间尺度特征的捕捉能力,从而提升整体性能。以下将详细介绍多尺度特征融合的概念、实现方法,以及如何将其集成到 CausalSimAM
中,并最终应用于 MoViNet 模型。
多尺度特征融合是指在不同的空间尺度(如不同的分辨率)下提取特征,并将这些特征进行融合,以捕捉图像或视频中不同尺度的信息。通过这种方式,模型能够同时关注局部细节和全局上下文,从而提高对复杂场景和多样化目标的识别能力。
CausalSimAM
中集成多尺度特征融合为了在 CausalSimAM
中实现多尺度特征融合,我们需要对原有的注意力机制进行扩展,使其能够在多个尺度上计算注意力权重,并将这些权重进行融合。以下是具体的实现步骤:
MultiScaleCausalSimAM
类我们将创建一个新的类 MultiScaleCausalSimAM
,该类将在多个尺度上计算注意力权重,并将这些权重进行融合。以下是详细的代码实现:
pythonimport tensorflow as tf from tensorflow.keras import layers @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): """ 初始化多尺度因果SimAM模块。 参数: e_lambda: 平滑参数,防止除零错误。 scales: 一个列表,定义要使用的尺度比例。例如,[1.0, 0.5, 0.25] 表示原始尺度、半尺度和四分之一尺度。 """ super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: """ 前向传播函数。 参数: inputs: 输入张量,形状为 (batch, time, height, width, channels)。 返回: 输出张量,与输入形状相同。 """ attentions = [] for scale in self.scales: if scale != 1.0: # 调整尺度 scaled_height = tf.cast(tf.shape(inputs)[2] * scale, tf.int32) scaled_width = tf.cast(tf.shape(inputs)[3] * scale, tf.int32) scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') else: scaled_inputs = inputs # 计算均值 mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) # 计算平方差 x_mu_square = tf.square(scaled_inputs - mu) # 计算n n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) # 计算能量函数y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 计算注意力权重 attention = self.sigmoid(y) if scale != 1.0: # 恢复到原始尺度 attention = tf.image.resize(attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') attentions.append(attention) # 融合所有尺度的注意力权重,使用平均融合 combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config
尺度调整:
tf.image.resize
函数调整输入张量的高度和宽度到不同的尺度。scale != 1.0
时,对输入进行下采样;当 scale == 1.0
时,保持原始尺度。计算注意力权重:
mu
,然后计算每个位置的平方差 x_mu_square
。y
。尺度恢复与融合:
combined_attention
。特征调整:
MultiScaleCausalSimAM
到 MobileBottleneck
接下来,我们需要在 MobileBottleneck
类中集成 MultiScaleCausalSimAM
。以下是修改后的 MobileBottleneck
类示例:
python@tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, ..., use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) # 其他初始化代码,例如扩展层、特征提取层、投影层等 self.use_simam = use_simam if self.use_simam: # 使用 MultiScaleCausalSimAM self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() def build(self, input_shape): # 构建扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=..., kernel_size=..., strides=..., padding='same', use_bias=False, activation=None, # 其他参数 ) # 初始化其他层 super(MobileBottleneck, self).build(input_shape) def call(self, inputs: tf.Tensor, states: Optional[dict] = None, training=False) -> Tuple[tf.Tensor, dict]: # 扩展、特征提取和投影 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 处理残差连接 x = self._identity(x) # 假设有 identity 层 x = self._rezero(x) # 假设有 rezero 层 if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, }) return config
在实例化 MultiScaleCausalSimAM
时,可以传递以下参数:
e_lambda
:平滑参数,建议保持默认值 1e-4
,也可以根据需求调整。scales
:定义要使用的尺度比例列表。例如,[1.0, 0.5, 0.25]
表示原始尺度、半尺度和四分之一尺度。可以根据数据集的特性和计算资源调整。以下是一个简化的 MoViNet 模型示例,展示如何在模型中集成 MultiScaleCausalSimAM
:
pythonimport tensorflow as tf from tensorflow.keras import layers, models # 定义 MobileBottleneck 类(包含 MultiScaleCausalSimAM) @tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1, 1), use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) self.use_simam = use_simam if self.use_simam: self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() # 定义其他层,例如扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=filters, kernel_size=kernel_size, strides=strides, padding='same', use_bias=False, activation=None ) self._identity = layers.Conv3D( filters=filters, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same', use_bias=False, activation=None ) self._rezero = layers.Lambda(lambda x: x * 0.0 + x) # 简单的 ReZero 机制 self._stochastic_depth = None # 根据需要添加 Stochastic Depth def call(self, inputs, states=None, training=False): # 投影层 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if hasattr(self, '_skip_layer') and self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, 'filters': self._projection_layer.filters, 'kernel_size': self._projection_layer.kernel_size, 'strides': self._projection_layer.strides, }) return config # 定义 MoViNet 模型 class MoViNetModel(tf.keras.Model): def __init__(self, num_classes, **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义 Stem 层 self.stem = layers.Conv3D( filters=32, kernel_size=(3, 7, 7), strides=(2, 2, 2), padding='same', use_bias=False, activation=None ) self.stem_bn = layers.BatchNormalization() self.stem_act = layers.Activation('relu') # 定义 MobileBottleneck 层 self.blocks = [ MobileBottleneck( filters=64, kernel_size=(1, 3, 3), strides=(1, 1, 1), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), MobileBottleneck( filters=128, kernel_size=(1, 3, 3), strides=(2, 2, 2), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), # 添加更多的 MobileBottleneck 层 ] # 定义 Head 层 self.global_pool = layers.GlobalAveragePooling3D() self.classifier = layers.Dense(num_classes, activation='softmax') def call(self, inputs, training=False): # Stem 层 x = self.stem(inputs) x = self.stem_bn(x, training=training) x = self.stem_act(x) # MobileBottleneck 层 states = {} for block in self.blocks: x, states = block(x, states, training=training) # Head 层 x = self.global_pool(x) x = self.classifier(x) return x # 创建模型实例 num_classes = 100 # 根据任务调整 model = MoViNetModel(num_classes=num_classes) # 编译模型 model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'] ) # 打印模型架构 model.build(input_shape=(None, 16, 224, 224, 3)) # 假设输入为16帧,224x224大小,RGB图像 model.summary()
MultiScaleCausalSimAM
的使用:
MobileBottleneck
中实例化 MultiScaleCausalSimAM
,并传递必要的参数,如 e_lambda
和 scales
。scales
参数定义了要使用的尺度比例,例如 [1.0, 0.5, 0.25]
表示原始尺度、半尺度和四分之一尺度。集成位置:
MobileBottleneck
的投影层之后,插入 MultiScaleCausalSimAM
,以增强投影后的特征表示。模型架构:
训练与验证:
model.compile
和 model.fit
方法进行模型的编译和训练。(batch_size, time_steps, height, width, channels)
。e_lambda
参数:
e_lambda
用于平滑能量函数,防止除零错误。初始值通常设为 1e-4
,可以根据数据集的特性进行调整。scales
参数:
scales
定义了要使用的不同尺度。常见的设置如 [1.0, 0.5, 0.25]
,可以根据需要添加更多尺度或调整现有尺度。学习率:
1e-4
,并根据模型的收敛情况进行调整。正则化:
Conv3D
层中添加 kernel_regularizer=tf.keras.regularizers.l2(1e-5)
等正则化项。对比实验:
CausalSimAM
与单尺度 CausalSimAM
或不使用注意力机制时的模型性能。注意力权重可视化:
因果性验证:
多尺度效果分析:
为了进一步提升模型的灵活性,可以引入自适应尺度选择机制,让模型根据输入数据自动调整使用的尺度。
在多尺度注意力计算中,使用深度可分离卷积(Depthwise Separable Convolution)可以减少计算量,提高效率。
可以将多尺度注意力与其他注意力机制(如通道注意力、空间注意力)结合,进一步增强特征表示。
除了平均融合,还可以尝试其他融合策略,如加权平均、最大值融合等,根据实验结果选择最优策略。
以下是一个完整的代码示例,展示了如何定义并集成 MultiScaleCausalSimAM
到 MobileBottleneck
,并最终构建 MoViNet 模型:
pythonimport tensorflow as tf from tensorflow.keras import layers, models @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: attentions = [] for scale in self.scales: if scale != 1.0: scaled_height = tf.cast(tf.shape(inputs)[2] * scale, tf.int32) scaled_width = tf.cast(tf.shape(inputs)[3] * scale, tf.int32) scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') else: scaled_inputs = inputs mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(scaled_inputs - mu) n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) if scale != 1.0: attention = tf.image.resize(attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') attentions.append(attention) combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config @tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1, 1), use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) self.use_simam = use_simam if self.use_simam: self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() # 定义其他层,例如扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=filters, kernel_size=kernel_size, strides=strides, padding='same', use_bias=False, activation=None ) self._identity = layers.Conv3D( filters=filters, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same', use_bias=False, activation=None ) self._rezero = layers.Lambda(lambda x: x * 0.0 + x) # 简单的 ReZero 机制 self._stochastic_depth = None # 根据需要添加 Stochastic Depth def call(self, inputs, states=None, training=False): # 投影层 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if hasattr(self, '_skip_layer') and self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, 'filters': self._projection_layer.filters, 'kernel_size': self._projection_layer.kernel_size, 'strides': self._projection_layer.strides, }) return config # 定义 MoViNet 模型 class MoViNetModel(tf.keras.Model): def __init__(self, num_classes, **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义 Stem 层 self.stem = layers.Conv3D( filters=32, kernel_size=(3, 7, 7), strides=(2, 2, 2), padding='same', use_bias=False, activation=None ) self.stem_bn = layers.BatchNormalization() self.stem_act = layers.Activation('relu') # 定义 MobileBottleneck 层 self.blocks = [ MobileBottleneck( filters=64, kernel_size=(1, 3, 3), strides=(1, 1, 1), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), MobileBottleneck( filters=128, kernel_size=(1, 3, 3), strides=(2, 2, 2), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), # 添加更多的 MobileBottleneck 层 ] # 定义 Head 层 self.global_pool = layers.GlobalAveragePooling3D() self.classifier = layers.Dense(num_classes, activation='softmax') def call(self, inputs, training=False): # Stem 层 x = self.stem(inputs) x = self.stem_bn(x, training=training) x = self.stem_act(x) # MobileBottleneck 层 states = {} for block in self.blocks: x, states = block(x, states, training=training) # Head 层 x = self.global_pool(x) x = self.classifier(x) return x # 创建模型实例 num_classes = 100 # 根据任务调整 model = MoViNetModel(num_classes=num_classes) # 编译模型 model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'] ) # 打印模型架构 model.build(input_shape=(None, 16, 224, 224, 3)) # 假设输入为16帧,224x224大小,RGB图像 model.summary()
定义 MultiScaleCausalSimAM
:
e_lambda
和 scales
作为参数,初始化 Sigmoid 激活函数。call
方法:
y
,并生成注意力权重。get_config
方法:确保层的可序列化性,便于模型保存和加载。修改 MobileBottleneck
类:
MultiScaleCausalSimAM
:在投影层之后,插入 MultiScaleCausalSimAM
,并传递必要的参数。self._identity
和 self._rezero
处理残差连接,保持特征的连续性和稳定性。use_simam
参数控制是否使用 SimAM 模块,增强模型的灵活性。构建 MoViNet 模型:
模型编译与训练:
假设您有一个视频数据集,并且已经准备好了训练和验证数据集(train_dataset
和 val_dataset
),可以使用以下代码进行训练:
python# 假设 train_dataset 和 val_dataset 已经定义,并且输出形状为 (batch_size, time, height, width, channels) # 训练模型 model.fit( train_dataset, epochs=50, validation_data=val_dataset, callbacks=[ tf.keras.callbacks.ModelCheckpoint('movinet_model.h5', save_best_only=True), tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) ] )
计算开销:
scales
列表,选择合适的尺度比例。nearest
或 area
,以减少计算资源的消耗。模型复杂度:
训练稳定性:
超参数调整:
e_lambda
和 scales
参数,以获得最佳的模型性能。通过在 CausalSimAM
模块中集成多尺度特征融合,您可以显著提升 MoViNet 模型对不同尺度特征的捕捉能力,从而提高整体性能。上述步骤详细介绍了如何定义和实现 MultiScaleCausalSimAM
,以及如何将其集成到 MobileBottleneck
中,并最终应用于 MoViNet 模型。通过适当的参数调整和性能验证,您可以优化模型以适应特定的任务需求。
关键步骤总结:
MultiScaleCausalSimAM
类,实现多尺度特征融合的注意力机制。MobileBottleneck
类中,插入 MultiScaleCausalSimAM
层,增强特征表示。通过这些步骤,您可以有效地利用多尺度特征融合机制,提升 MoViNet 模型在视频分析任务中的表现。
Traceback (most recent call last):
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/train_stream.py", line 33, in <module>
backbone = movinet.Movinet(
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet.py", line 413, in init
inputs, outputs = self.build_network(input_specs, state_specs=state_specs)
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet.py", line 512, in build_network
x, states = layer_obj(x, states=states)
File "/home/zhouyi/.local/lib/python3.10/site-packages/tf_keras/src/utils/traceback_utils.py", line 70, in error_handler
raise e.with_traceback(filtered_tb) from None
File "/tmp/autograph_generated_filevcmf5dk1.py", line 23, in tf__call
retval = ag.converted_call(ag_.ld(self).mobile_bottleneck, (ag_.ld(inputs),), dict(states=ag__.ld(states)), fscope)
File "/tmp/autograph_generated_filenok9rby4.py", line 40, in tf__call
x = ag.converted_call(ag__.ld(self).simam, (ag__.ld(x),), None, fscope)
File "/tmp/autograph_generated_fileym7ctnd5.py", line 80, in tf__call
ag.for_stmt(ag__.ld(self).scales, None, loop_body, get_state_2, set_state_2, (), {'iterate_names': 'scale'})
File "/tmp/autograph_generated_fileym7ctnd5.py", line 46, in loop_body
ag.if_stmt(ag__.ld(scale) != 1.0, if_body, else_body, get_state, set_state, ('scaled_inputs',), 1)
File "/tmp/autograph_generated_fileym7ctnd5.py", line 39, in if_body
scaled_height = ag.converted_call(ag__.ld(tf).cast, (ag__.converted_call(ag__.ld(tf).shape, (ag__.ld(inputs),), None, fscope)[2] * ag__.ld(scale), ag__.ld(tf).int32), None, fscope)
TypeError: Exception encountered when calling layer "block0_layer0" (type MovinetBlock).
in user code:
textFile "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 1414, in call * return self._mobile_bottleneck(inputs, states=states) File "/home/zhouyi/.local/lib/python3.10/site-packages/tf_keras/src/utils/traceback_utils.py", line 70, in error_handler ** raise e.with_traceback(filtered_tb) from None File "/tmp/__autograph_generated_filenok9rby4.py", line 40, in tf__call x = ag__.converted_call(ag__.ld(self).simam, (ag__.ld(x),), None, fscope) File "/tmp/__autograph_generated_fileym7ctnd5.py", line 80, in tf__call ag__.for_stmt(ag__.ld(self).scales, None, loop_body, get_state_2, set_state_2, (), {'iterate_names': 'scale'}) File "/tmp/__autograph_generated_fileym7ctnd5.py", line 46, in loop_body ag__.if_stmt(ag__.ld(scale) != 1.0, if_body, else_body, get_state, set_state, ('scaled_inputs',), 1) File "/tmp/__autograph_generated_fileym7ctnd5.py", line 39, in if_body scaled_height = ag__.converted_call(ag__.ld(tf).cast, (ag__.converted_call(ag__.ld(tf).shape, (ag__.ld(inputs),), None, fscope)[2] * ag__.ld(scale), ag__.ld(tf).int32), None, fscope) TypeError: Exception encountered when calling layer 'bneck' (type MobileBottleneck). in user code: File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 1074, in call * x = self.simam(x) File "/home/zhouyi/.local/lib/python3.10/site-packages/tf_keras/src/utils/traceback_utils.py", line 70, in error_handler ** raise e.with_traceback(filtered_tb) from None File "/tmp/__autograph_generated_fileym7ctnd5.py", line 80, in tf__call ag__.for_stmt(ag__.ld(self).scales, None, loop_body, get_state_2, set_state_2, (), {'iterate_names': 'scale'}) File "/tmp/__autograph_generated_fileym7ctnd5.py", line 46, in loop_body ag__.if_stmt(ag__.ld(scale) != 1.0, if_body, else_body, get_state, set_state, ('scaled_inputs',), 1) File "/tmp/__autograph_generated_fileym7ctnd5.py", line 39, in if_body scaled_height = ag__.converted_call(ag__.ld(tf).cast, (ag__.converted_call(ag__.ld(tf).shape, (ag__.ld(inputs),), None, fscope)[2] * ag__.ld(scale), ag__.ld(tf).int32), None, fscope) TypeError: Exception encountered when calling layer 'multi_scale_causal_sim_am' (type MultiScaleCausalSimAM). in user code: File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 961, in call * scaled_height = tf.cast(tf.shape(inputs)[2] * scale, tf.int32) TypeError: Expected int32 passed to parameter 'y' of op 'Mul', got 0.5 of type 'float' instead. Error: Expected int32, but got 0.5 of type 'float'. Call arguments received by layer 'multi_scale_causal_sim_am' (type MultiScaleCausalSimAM): • inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32) Call arguments received by layer 'bneck' (type MobileBottleneck): • inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32) • states={}
Call arguments received by layer "block0_layer0" (type MovinetBlock):
• inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32)
• states={}
您在集成 MultiScaleCausalSimAM
到 MoViNet 模型时遇到了一个类型错误:
TypeError: Expected int32 passed to parameter 'y' of op 'Mul', got 0.5 of type 'float' instead. Error: Expected int32, but got 0.5 of type 'float'.
该错误发生在尝试将浮点数(例如 0.5
)传递给期望整数类型(int32
)的操作时。这通常是由于在处理张量形状和尺度调整时,浮点数与整数类型之间的不匹配引起的。具体来说,错误出现在以下代码行:
pythonscaled_height = tf.cast(tf.shape(inputs)[2] * scale, tf.int32)
这里,tf.shape(inputs)[2]
是一个整数(表示高度),而 scale
是一个浮点数(如 0.5
)。乘积结果是浮点数,直接使用 tf.cast
将其转换为整数类型时,可能会引发类型错误。
要解决这个问题,您需要确保在进行尺度调整时,所有涉及的运算和类型转换都是兼容的。具体来说,您可以按照以下步骤修改 MultiScaleCausalSimAM
类:
使用 tf.round
保证尺度大小为整数:
在将乘积结果转换为整数类型之前,先使用 tf.round
将浮点数四舍五入为最近的整数。这可以避免由于小数部分引起的类型错误。
确保所有涉及的变量类型正确:
确保 scale
是浮点数类型(如 tf.float32
),并在需要时进行适当的类型转换。
MultiScaleCausalSimAM
类以下是修改后的 MultiScaleCausalSimAM
类,包含了上述的改进:
pythonimport tensorflow as tf from tensorflow.keras import layers @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): """ 初始化多尺度因果SimAM模块。 参数: e_lambda: 平滑参数,防止除零错误。 scales: 一个列表,定义要使用的尺度比例。例如,[1.0, 0.5, 0.25] 表示原始尺度、半尺度和四分之一尺度。 """ super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: """ 前向传播函数。 参数: inputs: 输入张量,形状为 (batch, time, height, width, channels)。 返回: 输出张量,与输入形状相同。 """ attentions = [] for scale in self.scales: if scale != 1.0: # 调整尺度,确保尺寸为整数 scaled_height = tf.cast(tf.round(tf.cast(tf.shape(inputs)[2], tf.float32) * scale), tf.int32) scaled_width = tf.cast(tf.round(tf.cast(tf.shape(inputs)[3], tf.float32) * scale), tf.int32) scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') else: scaled_inputs = inputs # 计算均值 mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) # 计算平方差 x_mu_square = tf.square(scaled_inputs - mu) # 计算n n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) # 计算能量函数y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 计算注意力权重 attention = self.sigmoid(y) if scale != 1.0: # 恢复到原始尺度 attention = tf.image.resize(attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') attentions.append(attention) # 融合所有尺度的注意力权重,使用平均融合 combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config
使用 tf.round
和双重 tf.cast
:
pythonscaled_height = tf.cast(tf.round(tf.cast(tf.shape(inputs)[2], tf.float32) * scale), tf.int32) scaled_width = tf.cast(tf.round(tf.cast(tf.shape(inputs)[3], tf.float32) * scale), tf.int32)
这确保了在将浮点数乘以 scale
后,结果通过四舍五入变为整数,然后再转换为 int32
类型。
保证 scale
是浮点数:
如果 scale
是来自可训练参数或动态生成的,请确保其类型为浮点数(如 tf.float32
)。
以下是一个完整的示例,展示了如何定义和集成 MultiScaleCausalSimAM
到 MobileBottleneck
,并最终构建 MoViNet 模型:
pythonimport tensorflow as tf from tensorflow.keras import layers, models @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: attentions = [] for scale in self.scales: if scale != 1.0: scaled_height = tf.cast(tf.round(tf.cast(tf.shape(inputs)[2], tf.float32) * scale), tf.int32) scaled_width = tf.cast(tf.round(tf.cast(tf.shape(inputs)[3], tf.float32) * scale), tf.int32) scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') else: scaled_inputs = inputs mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) x_mu_square = tf.square(scaled_inputs - mu) n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 attention = self.sigmoid(y) if scale != 1.0: attention = tf.image.resize(attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') attentions.append(attention) combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config @tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1, 1), use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) self.use_simam = use_simam if self.use_simam: self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() # 定义其他层,例如扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=filters, kernel_size=kernel_size, strides=strides, padding='same', use_bias=False, activation=None ) self._identity = layers.Conv3D( filters=filters, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same', use_bias=False, activation=None ) self._rezero = layers.Lambda(lambda x: x * 0.0 + x) # 简单的 ReZero 机制 self._stochastic_depth = None # 根据需要添加 Stochastic Depth def call(self, inputs, states=None, training=False): # 投影层 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if hasattr(self, '_skip_layer') and self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, 'filters': self._projection_layer.filters, 'kernel_size': self._projection_layer.kernel_size, 'strides': self._projection_layer.strides, }) return config # 定义 MoViNet 模型 class MoViNetModel(tf.keras.Model): def __init__(self, num_classes, **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义 Stem 层 self.stem = layers.Conv3D( filters=32, kernel_size=(3, 7, 7), strides=(2, 2, 2), padding='same', use_bias=False, activation=None ) self.stem_bn = layers.BatchNormalization() self.stem_act = layers.Activation('relu') # 定义 MobileBottleneck 层 self.blocks = [ MobileBottleneck( filters=64, kernel_size=(1, 3, 3), strides=(1, 1, 1), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), MobileBottleneck( filters=128, kernel_size=(1, 3, 3), strides=(2, 2, 2), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), # 添加更多的 MobileBottleneck 层 ] # 定义 Head 层 self.global_pool = layers.GlobalAveragePooling3D() self.classifier = layers.Dense(num_classes, activation='softmax') def call(self, inputs, training=False): # Stem 层 x = self.stem(inputs) x = self.stem_bn(x, training=training) x = self.stem_act(x) # MobileBottleneck 层 states = {} for block in self.blocks: x, states = block(x, states, training=training) # Head 层 x = self.global_pool(x) x = self.classifier(x) return x # 创建模型实例 num_classes = 100 # 根据任务调整 model = MoViNetModel(num_classes=num_classes) # 编译模型 model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'] ) # 打印模型架构 model.build(input_shape=(None, 16, 224, 224, 3)) # 假设输入为16帧,224x224大小,RGB图像 model.summary()
MultiScaleCausalSimAM
类:
tf.round
确保调整后的高度和宽度为整数。MobileBottleneck
类:
MultiScaleCausalSimAM
:在投影层之后插入 MultiScaleCausalSimAM
层。self._identity
和 self._rezero
实现简单的残差连接。MoViNetModel
类:
假设您已经准备好了训练和验证数据集 (train_dataset
和 val_dataset
),可以使用以下代码进行模型训练:
python# 训练模型 model.fit( train_dataset, epochs=50, validation_data=val_dataset, callbacks=[ tf.keras.callbacks.ModelCheckpoint('movinet_model.h5', save_best_only=True), tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) ] )
计算开销:
1.0
)下。您可以根据实际需求调整 scales
列表,选择合适的尺度比例。nearest
或 area
,以减少计算资源的消耗。模型复杂度:
训练稳定性:
超参数调整:
e_lambda
和 scales
参数,以获得最佳的模型性能。可视化与验证:
自适应尺度选择:
使用深度可分离卷积:
结合其他注意力机制:
动态融合策略:
通过在 MultiScaleCausalSimAM
中引入 tf.round
和确保类型转换正确,您可以解决当前的类型错误。此外,确保在整个模型架构中正确集成和使用多尺度注意力机制,可以显著提升 MoViNet 模型在视频分析任务中的表现。以下是关键步骤的总结:
修正尺度调整:
tf.round
确保调整后的尺寸为整数。tf.cast
确保数据类型正确。集成多尺度注意力模块:
MobileBottleneck
类中插入 MultiScaleCausalSimAM
层。MobileBottleneck
和 MoViNetModel
类以适应新的注意力机制。训练与验证:
性能优化:
通过这些步骤,您可以有效地利用多尺度特征融合机制,提升 MoViNet 模型在视频分析任务中的表现。
Traceback (most recent call last):
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/train_stream.py", line 33, in <module>
backbone = movinet.Movinet(
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet.py", line 413, in init
inputs, outputs = self.build_network(input_specs, state_specs=state_specs)
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet.py", line 512, in build_network
x, states = layer_obj(x, states=states)
File "/home/zhouyi/.local/lib/python3.10/site-packages/tf_keras/src/utils/traceback_utils.py", line 70, in error_handler
raise e.with_traceback(filtered_tb) from None
File "/tmp/autograph_generated_file3974x36x.py", line 23, in tf__call
retval = ag.converted_call(ag_.ld(self).mobile_bottleneck, (ag_.ld(inputs),), dict(states=ag__.ld(states)), fscope)
File "/tmp/autograph_generated_file43ve48ha.py", line 40, in tf__call
x = ag.converted_call(ag__.ld(self).simam, (ag__.ld(x),), None, fscope)
File "/opt/miniconda3/envs/tf2.18/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler
raise e.with_traceback(filtered_tb) from None
File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 963, in call
scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear')
ValueError: Exception encountered when calling layer "block0_layer0" (type MovinetBlock).
in user code:
textFile "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 1413, in call * return self._mobile_bottleneck(inputs, states=states) File "/home/zhouyi/.local/lib/python3.10/site-packages/tf_keras/src/utils/traceback_utils.py", line 70, in error_handler ** raise e.with_traceback(filtered_tb) from None File "/tmp/__autograph_generated_file43ve48ha.py", line 40, in tf__call x = ag__.converted_call(ag__.ld(self).simam, (ag__.ld(x),), None, fscope) File "/opt/miniconda3/envs/tf2.18/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler raise e.with_traceback(filtered_tb) from None File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 963, in call scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') ValueError: Exception encountered when calling layer 'bneck' (type MobileBottleneck). in user code: File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 1073, in call * x = self.simam(x) File "/opt/miniconda3/envs/tf2.18/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler ** raise e.with_traceback(filtered_tb) from None File "/home/zhouyi/Documents/ActionTasks/models/official/projects/movinet/modeling/movinet_layers.py", line 963, in call scaled_inputs = tf.image.resize(inputs, size=[scaled_height, scaled_width], method='bilinear') ValueError: Exception encountered when calling MultiScaleCausalSimAM.call(). 'images' must have either 3 or 4 dimensions. Arguments received by MultiScaleCausalSimAM.call(): • inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32) Call arguments received by layer 'bneck' (type MobileBottleneck): • inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32) • states={}
Call arguments received by layer "block0_layer0" (type MovinetBlock):
• inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32)
• states={}
进程已结束,退出代码1
您在集成 MultiScaleCausalSimAM
到 MoViNet 模型时遇到的错误主要是由于 tf.image.resize
函数期望输入张量为 3D 或 4D,而您传递的是 5D 张量 (batch, time, height, width, channels)
。具体错误信息如下:
textValueError: 'images' must have either 3 or 4 dimensions. Arguments received by MultiScaleCausalSimAM.call(): • inputs=tf.Tensor(shape=(None, None, None, None, 8), dtype=float32)
tf.image.resize
函数的签名如下:
pythontf.image.resize(images, size, method='bilinear', preserve_aspect_ratio=False, antialias=False, name=None)
其中,images
参数需要是 3D 或 4D 张量:
(height, width, channels)
(batch_size, height, width, channels)
而在您的实现中,输入张量的形状是 (batch, time, height, width, channels)
,即 5D 张量,导致 tf.image.resize
抛出错误。
为了处理 5D 张量,您需要在调用 tf.image.resize
之前,将 5D 张量转换为 4D 张量,进行尺寸调整后,再将其转换回 5D 张量。这可以通过以下步骤实现:
(batch, time, height, width, channels)
重塑为 (batch * time, height, width, channels)
。tf.image.resize
:对重塑后的 4D 张量进行尺寸调整。(batch, time, new_height, new_width, channels)
。以下是修改后的 MultiScaleCausalSimAM
类,解决了维度不匹配的问题,并确保多尺度特征融合的正确实现:
pythonimport tensorflow as tf from tensorflow.keras import layers @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): """ 初始化多尺度因果SimAM模块。 参数: e_lambda: 平滑参数,防止除零错误。 scales: 一个列表,定义要使用的尺度比例。例如,[1.0, 0.5, 0.25] 表示原始尺度、半尺度和四分之一尺度。 """ super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: """ 前向传播函数。 参数: inputs: 输入张量,形状为 (batch, time, height, width, channels)。 返回: 输出张量,与输入形状相同。 """ attentions = [] batch_size = tf.shape(inputs)[0] time_steps = tf.shape(inputs)[1] channels = tf.shape(inputs)[4] for scale in self.scales: if scale != 1.0: # 调整尺度,确保尺寸为整数 scaled_height = tf.cast(tf.round(tf.cast(tf.shape(inputs)[2], tf.float32) * scale), tf.int32) scaled_width = tf.cast(tf.round(tf.cast(tf.shape(inputs)[3], tf.float32) * scale), tf.int32) # 重塑为 (batch * time, height, width, channels) reshaped_inputs = tf.reshape(inputs, (-1, tf.shape(inputs)[2], tf.shape(inputs)[3], tf.shape(inputs)[4])) # 调整尺寸 scaled_inputs = tf.image.resize(reshaped_inputs, size=[scaled_height, scaled_width], method='bilinear') # 恢复形状为 (batch, time, scaled_height, scaled_width, channels) scaled_inputs = tf.reshape(scaled_inputs, (batch_size, time_steps, scaled_height, scaled_width, channels)) else: scaled_inputs = inputs # 计算均值 mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) # 计算平方差 x_mu_square = tf.square(scaled_inputs - mu) # 计算n n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) # 计算能量函数y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 计算注意力权重 attention = self.sigmoid(y) if scale != 1.0: # 恢复到原始尺度 # 重塑为 (batch * time, scaled_height, scaled_width, channels) reshaped_attention = tf.reshape(attention, (-1, tf.shape(attention)[2], tf.shape(attention)[3], tf.shape(attention)[4])) # 调整尺寸 restored_attention = tf.image.resize(reshaped_attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') # 恢复形状为 (batch, time, height, width, channels) restored_attention = tf.reshape(restored_attention, (batch_size, time_steps, tf.shape(inputs)[2], tf.shape(inputs)[3], channels)) attention = restored_attention attentions.append(attention) # 融合所有尺度的注意力权重,使用平均融合 combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config
重新调整张量形状:
tf.reshape
将 5D 张量 (batch, time, height, width, channels)
转换为 4D 张量 (batch * time, height, width, channels)
,以符合 tf.image.resize
的输入要求。应用 tf.image.resize
:
恢复原始形状:
(batch, time, new_height, new_width, channels)
。尺度恢复后的处理:
(height, width)
尺寸,以便与输入张量的尺寸匹配。MobileBottleneck
类确保在 MobileBottleneck
类中正确集成 MultiScaleCausalSimAM
。以下是修改后的 MobileBottleneck
类示例:
python@tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1, 1), use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) self.use_simam = use_simam if self.use_simam: self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() # 定义其他层,例如扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=filters, kernel_size=kernel_size, strides=strides, padding='same', use_bias=False, activation=None ) self._identity = layers.Conv3D( filters=filters, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same', use_bias=False, activation=None ) self._rezero = layers.Lambda(lambda x: x * 0.0 + x) # 简单的 ReZero 机制 self._stochastic_depth = None # 根据需要添加 Stochastic Depth def call(self, inputs, states=None, training=False): # 投影层 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if hasattr(self, '_skip_layer') and self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, 'filters': self._projection_layer.filters, 'kernel_size': self._projection_layer.kernel_size, 'strides': self._projection_layer.strides, }) return config
以下是完整的示例代码,展示了如何定义和集成 MultiScaleCausalSimAM
到 MobileBottleneck
,并最终构建 MoViNet 模型:
pythonimport tensorflow as tf from tensorflow.keras import layers, models @tf.keras.utils.register_keras_serializable(package='Vision') class MultiScaleCausalSimAM(tf.keras.layers.Layer): def __init__(self, e_lambda=1e-4, scales=[1.0, 0.5, 0.25], **kwargs): super(MultiScaleCausalSimAM, self).__init__(**kwargs) self.e_lambda = e_lambda self.scales = scales self.sigmoid = tf.keras.activations.sigmoid def call(self, inputs: tf.Tensor) -> tf.Tensor: attentions = [] batch_size = tf.shape(inputs)[0] time_steps = tf.shape(inputs)[1] channels = tf.shape(inputs)[4] for scale in self.scales: if scale != 1.0: scaled_height = tf.cast(tf.round(tf.cast(tf.shape(inputs)[2], tf.float32) * scale), tf.int32) scaled_width = tf.cast(tf.round(tf.cast(tf.shape(inputs)[3], tf.float32) * scale), tf.int32) # 重塑为 (batch * time, height, width, channels) reshaped_inputs = tf.reshape(inputs, (-1, tf.shape(inputs)[2], tf.shape(inputs)[3], tf.shape(inputs)[4])) # 调整尺寸 scaled_inputs = tf.image.resize(reshaped_inputs, size=[scaled_height, scaled_width], method='bilinear') # 恢复形状为 (batch, time, scaled_height, scaled_width, channels) scaled_inputs = tf.reshape(scaled_inputs, (batch_size, time_steps, scaled_height, scaled_width, channels)) else: scaled_inputs = inputs # 计算均值 mu = tf.reduce_mean(scaled_inputs, axis=[2, 3], keepdims=True) # 计算平方差 x_mu_square = tf.square(scaled_inputs - mu) # 计算n n = tf.cast(tf.shape(scaled_inputs)[2] * tf.shape(scaled_inputs)[3] - 1, dtype=inputs.dtype) # 计算能量函数y sum_x_mu_square = tf.reduce_sum(x_mu_square, axis=[2, 3], keepdims=True) y = x_mu_square / (4.0 * (sum_x_mu_square / n + self.e_lambda)) + 0.5 # 计算注意力权重 attention = self.sigmoid(y) if scale != 1.0: # 恢复到原始尺度 reshaped_attention = tf.reshape(attention, (-1, tf.shape(attention)[2], tf.shape(attention)[3], tf.shape(attention)[4])) restored_attention = tf.image.resize(reshaped_attention, size=[tf.shape(inputs)[2], tf.shape(inputs)[3]], method='bilinear') restored_attention = tf.reshape(restored_attention, (batch_size, time_steps, tf.shape(inputs)[2], tf.shape(inputs)[3], channels)) attention = restored_attention attentions.append(attention) # 融合所有尺度的注意力权重,使用平均融合 combined_attention = tf.reduce_mean(tf.stack(attentions, axis=-1), axis=-1) return inputs * combined_attention def get_config(self): config = super(MultiScaleCausalSimAM, self).get_config() config.update({ 'e_lambda': self.e_lambda, 'scales': self.scales, }) return config @tf.keras.utils.register_keras_serializable(package='Vision') class MobileBottleneck(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1, 1), use_simam=True, simam_params=None, **kwargs): super(MobileBottleneck, self).__init__(**kwargs) self.use_simam = use_simam if self.use_simam: self.simam = MultiScaleCausalSimAM(**simam_params) if simam_params else MultiScaleCausalSimAM() # 定义其他层,例如扩展层、特征提取层、投影层等 self._projection_layer = layers.Conv3D( filters=filters, kernel_size=kernel_size, strides=strides, padding='same', use_bias=False, activation=None ) self._identity = layers.Conv3D( filters=filters, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same', use_bias=False, activation=None ) self._rezero = layers.Lambda(lambda x: x * 0.0 + x) # 简单的 ReZero 机制 self._stochastic_depth = None # 根据需要添加 Stochastic Depth def call(self, inputs, states=None, training=False): # 投影层 x = self._projection_layer(inputs) # 添加 MultiScaleCausalSimAM if self.use_simam: x = self.simam(x) # 残差连接 x = self._identity(x) x = self._rezero(x) if self._stochastic_depth is not None: x = self._stochastic_depth(x, training=training) if hasattr(self, '_skip_layer') and self._skip_layer is not None: skip = self._skip_layer(inputs) else: skip = inputs return x + skip, states def get_config(self): config = super(MobileBottleneck, self).get_config() config.update({ 'use_simam': self.use_simam, 'simam_params': self.simam.get_config() if self.use_simam else None, 'filters': self._projection_layer.filters, 'kernel_size': self._projection_layer.kernel_size, 'strides': self._projection_layer.strides, }) return config # 定义 MoViNet 模型 class MoViNetModel(tf.keras.Model): def __init__(self, num_classes, **kwargs): super(MoViNetModel, self).__init__(**kwargs) # 定义 Stem 层 self.stem = layers.Conv3D( filters=32, kernel_size=(3, 7, 7), strides=(2, 2, 2), padding='same', use_bias=False, activation=None ) self.stem_bn = layers.BatchNormalization() self.stem_act = layers.Activation('relu') # 定义 MobileBottleneck 层 self.blocks = [ MobileBottleneck( filters=64, kernel_size=(1, 3, 3), strides=(1, 1, 1), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), MobileBottleneck( filters=128, kernel_size=(1, 3, 3), strides=(2, 2, 2), use_simam=True, simam_params={'e_lambda': 1e-4, 'scales': [1.0, 0.5, 0.25]} ), # 添加更多的 MobileBottleneck 层 ] # 定义 Head 层 self.global_pool = layers.GlobalAveragePooling3D() self.classifier = layers.Dense(num_classes, activation='softmax') def call(self, inputs, training=False): # Stem 层 x = self.stem(inputs) x = self.stem_bn(x, training=training) x = self.stem_act(x) # MobileBottleneck 层 states = {} for block in self.blocks: x, states = block(x, states, training=training) # Head 层 x = self.global_pool(x) x = self.classifier(x) return x # 创建模型实例 num_classes = 100 # 根据任务调整 model = MoViNetModel(num_classes=num_classes) # 编译模型 model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'] ) # 打印模型架构 model.build(input_shape=(None, 16, 224, 224, 3)) # 假设输入为16帧,224x224大小,RGB图像 model.summary()
假设您已经准备好了训练和验证数据集 (train_dataset
和 val_dataset
),可以使用以下代码进行模型训练:
python# 训练模型 model.fit( train_dataset, epochs=50, validation_data=val_dataset, callbacks=[ tf.keras.callbacks.ModelCheckpoint('movinet_model.h5', save_best_only=True), tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) ] )
MultiScaleCausalSimAM
类:
tf.image.resize
进行尺寸调整,然后恢复回 5D 张量。MobileBottleneck
类:
MultiScaleCausalSimAM
:在投影层之后插入 MultiScaleCausalSimAM
层,增强特征表示。self._identity
和 self._rezero
实现简单的残差连接,保持特征的连续性和稳定性。MoViNetModel
类:
e_lambda
参数:
e_lambda
用于平滑能量函数,防止除零错误。初始值通常设为 1e-4
,可以根据数据集的特性进行调整。scales
参数:
scales
定义了要使用的不同尺度。常见的设置如 [1.0, 0.5, 0.25]
,可以根据需要添加更多尺度或调整现有尺度。学习率:
1e-4
,并根据模型的收敛情况进行调整。正则化:
Conv3D
层中添加 kernel_regularizer=tf.keras.regularizers.l2(1e-5)
等正则化项。对比实验:
CausalSimAM
与单尺度 CausalSimAM
或不使用注意力机制时的模型性能。评估指标包括准确率、损失值、训练时间等。注意力权重可视化:
因果性验证:
多尺度效果分析:
自适应尺度选择:
使用深度可分离卷积:
结合其他注意力机制:
动态融合策略:
通过上述修改,您可以解决当前的维度不匹配问题,并成功集成多尺度特征融合机制到 MultiScaleCausalSimAM
中。确保在整个模型架构中正确处理张量的形状,并在调用 tf.image.resize
之前将 5D 张量转换为 4D 张量。这样,您可以有效地利用多尺度特征融合机制,提升 MoViNet 模型在视频分析任务中的表现。
修正尺度调整:
tf.round
和双重 tf.cast
确保调整后的尺寸为整数。tf.image.resize
接受正确的维度。集成多尺度注意力模块:
MobileBottleneck
类中插入 MultiScaleCausalSimAM
层。MobileBottleneck
和 MoViNetModel
类以适应新的注意力机制。训练与验证:
性能优化:
通过这些步骤,您可以有效地利用多尺度特征融合机制,提升 MoViNet 模型在视频分析任务中的表现。如果在实施过程中遇到其他问题,请随时提供详细信息,以便进一步协助解决。