神经网络损失波动原因
생성일: 2024년 12월 14일
생성일: 2024년 12월 14일
为什么神经网络的训练中损失会出现波动,神经网络如下
class Tee(object):
def init(self, *files):
self.files = files
self.encoding = getattr(files[0], 'encoding', 'utf-8') # Add encoding attribute
textdef write(self, obj): for f in self.files: f.write(obj) f.flush() def flush(self): for f in self.files: f.flush()
output_file = open('7型修改控旋门', 'w', encoding='utf-8')
original_stdout = sys.stdout
tee = Tee(sys.stdout, output_file)
sys.stdout = tee
logger = logging.getLogger('my_sampler')
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(tee)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(stream_handler)
else:
logger.handlers = []
logger.addHandler(stream_handler)
logging.getLogger('qiskit').setLevel(logging.WARNING)
def save_plot(fig, filename):
fig.savefig(filename)
plt.close(fig)
def plot_history(history):
# Plot training and validation accuracy
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(history.history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in history.history:
ax.plot(history.history['val_accuracy'], label='Validation Accuracy')
else:
print("Validation accuracy not found in history.")
ax.set_title('Model Accuracy')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.legend(loc='upper left')
save_plot(fig, 'accuracy_plot.png')
text# Plot training and validation loss fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history.history['loss'], label='Training Loss') if 'val_loss' in history.history: ax.plot(history.history['val_loss'], label='Validation Loss') else: print("Validation loss not found in history.") ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') save_plot(fig, 'loss_plot.png')
def calculate_accuracy_fluctuation(history):
if 'val_accuracy' in history.history:
accuracies = history.history['val_accuracy']
elif 'accuracy' in history.history:
accuracies = history.history['accuracy']
else:
print("Accuracy metric not found in history.")
return None, None
mean_accuracy = np.mean(accuracies)
fluctuation = np.sum(np.abs(accuracies - mean_accuracy))
return mean_accuracy, fluctuation
dev = qml.device("default.qubit", wires=15)
def encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values):
# 第一组
qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
text# 第二组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) # 第三组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[2], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) # 第四组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[3], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) # 第五组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) qml.ctrl(qml.RY(angle_q3_values[4], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) # 第六组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[5], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) # 第七组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[6], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) # 第八组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[7], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1])
def apply_controlled_swap(swap_qubit, q_swap1, q_swap2):
"""
Modular method to apply a controlled SWAP gate.
textParameters: - swap_qubit (int): Control qubit for the SWAP gate. - q_swap1 (int): First qubit to swap. - q_swap2 (int): Second qubit to swap. """ qml.Hadamard(wires=swap_qubit) qml.CSWAP(wires=[swap_qubit, q_swap1, q_swap2]) qml.Hadamard(wires=swap_qubit)
@qml.qnode(dev, interface='tf', diff_method='backprop')
def quantum_neural_network(inputs, encoding_weights, rot_weights, entangle_weights):
control_qubits = [0, 1, 2]
text# Neuron groups encoding_data = [ {'data_qubits': {'q2': 3, 'q3': 4}, 'swap_qubit': 5}, {'data_qubits': {'q2': 6, 'q3': 7}, 'swap_qubit': 8}, {'data_qubits': {'q2': 9, 'q3': 10}, 'swap_qubit': 11}, {'data_qubits': {'q2': 12, 'q3': 13}, 'swap_qubit': 14} ] # Angle encoding: inputs are mapped to [0, pi/2] angles_q2_list = inputs # inputs shape is (4,) # Encoding weights mapped to angles_q3 angles_q3_list = encoding_weights # shape: (4, n_layers, 8) # 每个神经元中的每层如何编码(最终对该内容循环layers次) for layer in range(rot_weights.shape[0]): # 每层的内容 # 每层完整编码特征值和权重值 for idx, data_set in enumerate(encoding_data): swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] angle_q2 = angles_q2_list[idx] # scalar value angle_q3_values = angles_q3_list[idx, layer, :] # shape: (8,) # 编码一个神经元 encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[layer, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply parameterized entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[layer, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] apply_controlled_swap(swap_qubit, data_qubits['q3'], data_qubits['q2']) measured_qubits = [5, 8, 11, 14] results = [qml.expval(qml.PauliZ(qubit)) for qubit in measured_qubits] return results
class QuantumNeuralNetworkLayer(tf.keras.layers.Layer):
def init(self, n_qubits, n_layers, num_classes):
super(QuantumNeuralNetworkLayer, self).init()
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Define encoding weights with L2 regularization initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.01) self.encoding_weights = self.add_weight( shape=(4, self.n_layers, 8), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # Define rotation layer weights with L2 regularization self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # Define entangling weights with L2 regularization self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
class QuantumNeuralNetwork(tf.keras.Model):
def init(self, n_qubits, n_layers, num_classes, **kwargs):
super(QuantumNeuralNetwork, self).init(**kwargs)
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Quantum layer self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # Classical layers self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01), dtype=tf.float32 ) self.dense2 = tf.keras.layers.Dense( 8, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01), dtype=tf.float32 ) # Output layer directly connected to quantum outputs self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.01), dtype=tf.float32 # Ensure the output layer is float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) x = self.dense2(x) return self.output_layer(x)
def is_quantum_model(model):
for layer in model.layers:
if isinstance(layer, QuantumNeuralNetworkLayer):
return True
elif hasattr(layer, 'layers'):
if is_quantum_model(layer):
return True
return False
在神经网络的训练过程中,损失(loss)出现波动是一个常见现象,尤其是在复杂模型或特定类型的网络(如量子神经网络)中。针对您提供的量子神经网络代码,以下是可能导致损失波动的主要原因及相应的解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
原因:
解决方案:
建议:
损失函数在训练过程中的波动可能由多种因素引起,尤其是在复杂的量子神经网络中更为显著。建议从以下几个方面入手进行排查和优化:
通过系统地分析和调整上述因素,您应能够减小训练过程中损失的波动,提高模型的稳定性和性能。
训练集每个epoch开始前是否需要进行指标重置?
在训练神经网络,特别是复杂的量子神经网络时,正确管理和跟踪训练指标(如损失、准确率等)对于监控模型性能和诊断训练问题至关重要。关于您提到的“训练集每个epoch开始前是否需要进行指标重置”,以下是详细的解释和建议:
Model.fit
方法时的指标管理如果您使用的是 TensorFlow Keras 的标准训练流程,即通过 model.compile
和 model.fit
方法来训练模型,那么 Keras 会自动管理和重置训练和验证指标。在这种情况下,您无需手动重置指标,Keras 会在每个 epoch 开始前自动重置相关指标,以确保每个 epoch 的指标是独立计算的。
示例:
pythonmodel.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) history = model.fit(train_dataset, epochs=10, validation_data=val_dataset)
在上述代码中,Keras 会自动处理每个 epoch 开始前的指标重置,因此无需额外操作。
如果您采用的是自定义训练循环(例如,使用 tf.GradientTape
进行手动梯度计算和权重更新),那么 您需要手动管理和重置指标。这是因为自定义训练循环不会像 model.fit
那样自动处理指标的重置。
示例:
pythonimport tensorflow as tf # 定义指标 train_loss = tf.keras.metrics.Mean(name='train_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') val_loss = tf.keras.metrics.Mean(name='val_loss') val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='val_accuracy') for epoch in range(num_epochs): # 在每个 epoch 开始前重置指标 train_loss.reset_states() train_accuracy.reset_states() val_loss.reset_states() val_accuracy.reset_states() # 训练步骤 for x, y in train_dataset: with tf.GradientTape() as tape: predictions = model(x, training=True) loss = loss_fn(y, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_loss(loss) train_accuracy(y, predictions) # 验证步骤 for x_val, y_val in val_dataset: predictions = model(x_val, training=False) v_loss = loss_fn(y_val, predictions) val_loss(v_loss) val_accuracy(y_val, predictions) print( f'Epoch {epoch + 1}, ' f'Loss: {train_loss.result()}, ' f'Accuracy: {train_accuracy.result() * 100}, ' f'Val Loss: {val_loss.result()}, ' f'Val Accuracy: {val_accuracy.result() * 100}' )
在上述自定义训练循环中,每个 epoch 开始前通过调用 reset_states()
方法手动重置指标,以确保每个 epoch 的指标独立计算。
无论您使用哪种训练方法,正确重置指标都是至关重要的,因为:
基于您提供的量子神经网络代码,如果您使用的是标准的 Keras Model.fit
方法,那么无需手动重置指标;Keras 会自动处理这些细节。然而,如果您使用了自定义训练循环(例如,结合量子计算框架如 Qiskit 的特性),那么您需要手动管理指标的重置。
建议步骤:
确认训练方法:
model.fit
,无需额外操作。实现指标重置:
metric.reset_states()
方法在每个 epoch 开始前重置指标。监控和记录:
假设您使用标准的 Keras Model.fit
方法来训练您的量子神经网络模型,以下是一个简化的训练示例:
python# 假设您已经定义并编译了模型 model = QuantumNeuralNetwork(n_qubits=15, n_layers=..., num_classes=...) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) # 训练模型 history = model.fit(train_dataset, epochs=50, validation_data=val_dataset)
在这种情况下,Keras 会自动处理每个 epoch 的指标重置和记录,您只需专注于调整模型架构和超参数。
如果您选择使用自定义训练循环,参考前述的自定义训练循环示例,确保在每个 epoch 开始前重置所有指标。
Model.fit
:无需手动重置指标,Keras 会自动管理。通过正确管理训练指标,您可以更有效地监控和优化您的量子神经网络模型,减少训练过程中的波动,并提高模型的整体性能和稳定性。
为什么损失很高,准确率却也很高?比如如下训练结果
5/5 [==============================] - 110s 23s/step - batch: 2.0000 - size: 16.0000 - loss: 2.1130 - accuracy: 1.0000 - val_loss: 2.0821 - val_accuracy: 0.9333
在神经网络的训练过程中,损失(loss)很高但准确率(accuracy)也很高是一种看似矛盾但实际上可以解释的现象。这种情况可能由多种因素引起,尤其是在使用复杂架构如量子神经网络时。以下是可能的原因及相应的解决方案:
原因:
损失函数关注的是预测概率的置信度,而准确率仅关注预测类别是否正确。如果模型对正确类别的预测概率不够高,即使所有预测都是正确的,损失仍然可能较高。
例如,假设有三个类别:
在这种情况下,虽然预测是正确的(准确率为100%),但交叉熵损失为:
如果模型对正确类别的概率更低,损失将进一步增加。
解决方案:
pythonpredictions = model.predict(sample_inputs) print(predictions) # 查看每个样本的预测概率分布
sparse_categorical_crossentropy
或 categorical_crossentropy
,确保与输出层的激活函数(如 softmax
)匹配。原因:
categorical_crossentropy
时,如果标签是整数形式而非独热编码(one-hot encoded),会导致损失计算不正确。解决方案:
确认标签编码:
sparse_categorical_crossentropy
,标签应为整数形式。categorical_crossentropy
,标签应为独热编码形式。示例:
python# 对于 sparse_categorical_crossentropy model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) # 对于 categorical_crossentropy model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
转换标签格式(如需要):
pythonfrom tensorflow.keras.utils import to_categorical # 将整数标签转换为独热编码 one_hot_labels = to_categorical(integer_labels, num_classes=num_classes)
原因:
softmax
激活函数进行多分类任务。解决方案:
确保输出层使用合适的激活函数:
python# 对于多分类问题 self.output_layer = Dense( num_classes, activation='softmax', # 确保使用 softmax kernel_regularizer=tf.keras.regularizers.l2(0.01), dtype=tf.float32 )
检查模型的预测概率:
pythonpredictions = model.predict(sample_inputs) print(predictions) # 查看预测概率是否集中在正确类别
原因:
解决方案:
增加测量次数(Shots):通过增加测量次数,可以减少测量结果的统计波动,提高输出概率的准确性。
示例:
pythondev = qml.device("default.qubit", wires=15, shots=1024) # 增加 shots 数量
优化量子电路:简化量子电路,减少参数数量或层数,以降低系统的复杂性和噪声水平。
原因:
解决方案:
调整学习率:尝试减小或增大学习率,观察损失和准确率的变化。
使用自适应学习率优化器:如 Adam、RMSprop 等,这些优化器在许多情况下表现更稳定。
示例:
pythonoptimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
使用学习率调度器:动态调整学习率,以更好地适应训练过程。
示例:
pythonlr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5) model.fit(train_dataset, epochs=50, validation_data=val_dataset, callbacks=[lr_scheduler])
原因:
解决方案:
调整正则化强度:尝试减小 L2 正则化系数,观察对损失和准确率的影响。
示例:
pythonregularizer = tf.keras.regularizers.l2(0.001) # 减小正则化强度
使用其他正则化方法:如 Dropout,Batch Normalization 等,以帮助模型更好地泛化,同时保持预测概率的稳定性。
原因:
解决方案:
标准化输入数据:对输入数据进行归一化或标准化处理,确保其分布适合模型的训练。
示例:
pythonfrom sklearn.preprocessing import StandardScaler scaler = StandardScaler() train_data = scaler.fit_transform(train_data) val_data = scaler.transform(val_data)
原因:
解决方案:
监控梯度:检查梯度是否正常,避免梯度爆炸或消失。
使用梯度裁剪(Gradient Clipping):限制梯度的最大值,防止梯度爆炸。
示例:
pythonoptimizer = tf.keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0)
确保量子电路的数值稳定性:检查量子门参数是否合理,避免数值不稳定。
原因:
解决方案:
softmax
激活和 categorical_crossentropy
或 sparse_categorical_crossentropy
损失函数。以下是一些具体的操作步骤,帮助您诊断和解决损失高但准确率高的问题:
确保损失函数与标签编码方式匹配。例如,如果使用 sparse_categorical_crossentropy
,标签应为整数形式:
pythonmodel.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
如果使用 categorical_crossentropy
,标签应为独热编码形式:
pythonfrom tensorflow.keras.utils import to_categorical # 假设 labels 是整数形式 one_hot_labels = to_categorical(labels, num_classes=num_classes) model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
在训练过程中或训练后,检查模型对某些样本的预测概率:
pythonimport numpy as np # 获取部分样本的预测概率 sample_inputs, sample_labels = next(iter(train_dataset)) predictions = model.predict(sample_inputs) for i in range(len(sample_inputs)): print(f"True Label: {sample_labels[i]}, Predicted Probabilities: {predictions[i]}")
这有助于判断模型是否对正确类别赋予了足够高的概率。
尝试使用不同的优化器和学习率设置:
python# 使用 Adam 优化器并调整学习率 optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4) model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
或者使用学习率调度器:
pythonlr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5) model.fit(train_dataset, epochs=50, validation_data=val_dataset, callbacks=[lr_scheduler])
如果使用的是 Qiskit 作为量子模拟器,增加测量次数(shots)可以减少测量噪声:
pythondev = qml.device("default.qubit", wires=15, shots=1024) # 增加 shots 数量
减小 L2 正则化系数,观察对损失和准确率的影响:
pythonself.encoding_weights = self.add_weight( shape=(4, self.n_layers, 8), initializer=initializer, regularizer=tf.keras.regularizers.l2(0.001), # 减小正则化系数 trainable=True, name='encoding_weights', dtype=tf.float32 )
通过绘制训练和验证的损失与准确率曲线,观察模型的训练动态:
pythonimport matplotlib.pyplot as plt history = model.fit(train_dataset, epochs=50, validation_data=val_dataset) # 绘制损失曲线 plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.title('Loss Over Epochs') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() # 绘制准确率曲线 plt.subplot(1, 2, 2) plt.plot(history.history['accuracy'], label='Training Accuracy') plt.plot(history.history['val_accuracy'], label='Validation Accuracy') plt.title('Accuracy Over Epochs') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() plt.show()
如果问题依然存在,尝试简化模型架构,减少量子层的数量或参数,观察对损失和准确率的影响。
损失高但准确率高的现象通常表明模型在正确分类的同时,对正确类别的置信度不足。为了优化这一问题,建议从以下几个方面入手:
通过系统地分析和调整上述因素,您应能够降低损失值,同时保持甚至提升模型的准确率,从而提高模型的整体性能和稳定性。
import sys
import os
import pennylane as qml
from pennylane import numpy as np
import tensorflow as tf
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler, MinMaxScaler
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense
from importance_sampling.training import ConstantTimeImportanceTraining
from importance_sampling.samplers import ConstantTimeSampler
from importance_sampling.datasets import InMemoryDataset
from importance_sampling.reweighting import BiasedReweightingPolicy
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import logging
tf.keras.backend.set_floatx('float32')
class Tee(object):
def init(self, *files):
self.files = files
self.encoding = getattr(files[0], 'encoding', 'utf-8') # Add encoding attribute
textdef write(self, obj): for f in self.files: f.write(obj) f.flush() def flush(self): for f in self.files: f.flush()
output_file = open('7型swap移位变换_不重置RY.log', 'w', encoding='utf-8')
original_stdout = sys.stdout
tee = Tee(sys.stdout, output_file)
sys.stdout = tee
logger = logging.getLogger('my_sampler')
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(tee)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(stream_handler)
else:
logger.handlers = []
logger.addHandler(stream_handler)
logging.getLogger('qiskit').setLevel(logging.WARNING)
def save_plot(fig, filename):
fig.savefig(filename)
plt.close(fig)
def plot_history(history):
# Plot training and validation accuracy
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(history.history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in history.history:
ax.plot(history.history['val_accuracy'], label='Validation Accuracy')
else:
print("Validation accuracy not found in history.")
ax.set_title('Model Accuracy')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.legend(loc='upper left')
save_plot(fig, 'accuracy_plot.png')
text# Plot training and validation loss fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history.history['loss'], label='Training Loss') if 'val_loss' in history.history: ax.plot(history.history['val_loss'], label='Validation Loss') else: print("Validation loss not found in history.") ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') save_plot(fig, 'loss_plot.png')
def calculate_accuracy_fluctuation(history):
if 'val_accuracy' in history.history:
accuracies = history.history['val_accuracy']
elif 'accuracy' in history.history:
accuracies = history.history['accuracy']
else:
print("Accuracy metric not found in history.")
return None, None
mean_accuracy = np.mean(accuracies)
fluctuation = np.sum(np.abs(accuracies - mean_accuracy))
return mean_accuracy, fluctuation
dev = qml.device("default.qubit", wires=15)
def encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values):
# 第一组
qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
text# 第二组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) # 第三组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[2], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) # 第四组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[3], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) # 第五组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) qml.ctrl(qml.RY(angle_q3_values[4], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) # 第六组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[5], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) # 第七组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[6], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) # 第八组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[7], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1])
def encode_group_second(control_qubits, data_qubits, angle_q3_values):
# 第一组
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
text# 第二组 qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) # 第三组 qml.ctrl(qml.RY(angle_q3_values[2], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) # 第四组 qml.ctrl(qml.RY(angle_q3_values[3], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) # 第五组 qml.ctrl(qml.RY(angle_q3_values[4], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) # 第六组 qml.ctrl(qml.RY(angle_q3_values[5], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) # 第七组 qml.ctrl(qml.RY(angle_q3_values[6], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) # 第八组 qml.ctrl(qml.RY(angle_q3_values[7], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1])
def apply_controlled_swap(swap_qubit, q_swap1, q_swap2):
"""
Modular method to apply a controlled SWAP gate.
textParameters: - swap_qubit (int): Control qubit for the SWAP gate. - q_swap1 (int): First qubit to swap. - q_swap2 (int): Second qubit to swap. """ qml.Hadamard(wires=swap_qubit) qml.CSWAP(wires=[swap_qubit, q_swap1, q_swap2]) qml.Hadamard(wires=swap_qubit)
@qml.qnode(dev, interface='tf', diff_method='backprop')
def quantum_neural_network(inputs, encoding_weights, rot_weights, entangle_weights):
control_qubits = [0, 1, 2]
text# Neuron groups encoding_data = [ {'data_qubits': {'q2': 3, 'q3': 4}, 'swap_qubit': 5}, {'data_qubits': {'q2': 6, 'q3': 7}, 'swap_qubit': 8}, {'data_qubits': {'q2': 9, 'q3': 10}, 'swap_qubit': 11}, {'data_qubits': {'q2': 12, 'q3': 13}, 'swap_qubit': 14} ] # Angle encoding: inputs are mapped to [0, pi/2] angles_q2_list = inputs # inputs shape is (4,) # Encoding weights mapped to angles_q3 angles_q3_list = encoding_weights # shape: (4, n_layers, 8) # 第一层:同时编码values和weights for idx, data_set in enumerate(encoding_data): swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] angle_q2 = angles_q2_list[idx] # scalar value angle_q3_values = angles_q3_list[idx, 0, :] # shape: (8,) # 编码一个神经元 encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[0, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply parameterized entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[0, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] apply_controlled_swap(swap_qubit, data_qubits['q3'], data_qubits['q2']) # 第二层:仅编码weights for layer in range(rot_weights.shape[0] - 1): # 每层的内容 layer_second = layer + 1 # 每层完整编码特征值和权重值 for idx, data_set in enumerate(encoding_data): data_qubits = data_set['data_qubits'] angle_q3_values = angles_q3_list[idx, layer_second, :] # shape: (8,) # 编码一个神经元 encode_group_second(control_qubits, data_qubits, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[layer_second, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply parameterized entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[layer_second, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] if layer % 2 == 0: apply_controlled_swap(data_qubits['q2'], swap_qubit, data_qubits['q3']) else: apply_controlled_swap(swap_qubit, data_qubits['q2'], data_qubits['q3']) if n_layers % 2 == 1: measured_qubits = [5, 8, 11, 14] else: measured_qubits = [3, 6, 9, 12] results = [qml.expval(qml.PauliZ(qubit)) for qubit in measured_qubits] return results
class QuantumNeuralNetworkLayer(tf.keras.layers.Layer):
def init(self, n_qubits, n_layers, num_classes):
super(QuantumNeuralNetworkLayer, self).init()
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Define encoding weights with L2 regularization initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.01) self.encoding_weights = self.add_weight( shape=(4, self.n_layers, 8), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # Define rotation layer weights with L2 regularization self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # Define entangling weights with L2 regularization self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
class QuantumNeuralNetwork(tf.keras.Model):
def init(self, n_qubits, n_layers, num_classes, **kwargs):
super(QuantumNeuralNetwork, self).init(**kwargs)
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Quantum layer self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # Classical layers self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) self.dense2 = tf.keras.layers.Dense( 8, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) # Output layer directly connected to quantum outputs self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 # Ensure the output layer is float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) x = self.dense2(x) return self.output_layer(x)
def is_quantum_model(model):
for layer in model.layers:
if isinstance(layer, QuantumNeuralNetworkLayer):
return True
elif hasattr(layer, 'layers'):
if is_quantum_model(layer):
return True
return False
if name == "main":
# Load data
iris = load_iris()
X = iris['data'].astype(np.float32) # Ensure data type is float32
y = iris['target'].reshape(-1, 1)
text# One-Hot encode labels encoder = OneHotEncoder(sparse_output=False) y_encoded = encoder.fit_transform(y).astype(np.float32) num_classes = y_encoded.shape[1] # Initialize lists to hold indices for each split train_indices = [] val_indices = [] test_indices = [] # Set random seed for reproducibility np.random.seed(42) # For each class, select the required number of samples for class_label in range(num_classes): # Get all indices for the current class class_indices = np.where(y.flatten() == class_label)[0] # Shuffle the indices np.random.shuffle(class_indices) # Select first 30 for training, next 10 for validation, next 10 for testing train_idx = class_indices[:30] val_idx = class_indices[30:40] test_idx = class_indices[40:50] # Append to the respective lists train_indices.extend(train_idx) val_indices.extend(val_idx) test_indices.extend(test_idx) # Convert lists to numpy arrays train_indices = np.array(train_indices) val_indices = np.array(val_indices) test_indices = np.array(test_indices) # Split the data using the indices x_train, y_train = X[train_indices], y_encoded[train_indices] x_val, y_val = X[val_indices], y_encoded[val_indices] x_test, y_test = X[test_indices], y_encoded[test_indices] # Standardize and normalize data (fit only on training set) scaler = StandardScaler() X_train_standardized = scaler.fit_transform(x_train).astype(np.float32) X_val_standardized = scaler.transform(x_val).astype(np.float32) X_test_standardized = scaler.transform(x_test).astype(np.float32) # Map features to [0, π/2] range min_max_scaler = MinMaxScaler(feature_range=(0, np.pi / 2)) X_train_mapped = min_max_scaler.fit_transform(X_train_standardized).astype(np.float32) X_val_mapped = min_max_scaler.transform(X_val_standardized).astype(np.float32) X_test_mapped = min_max_scaler.transform(X_test_standardized).astype(np.float32) # Hyperparameters learning_rate = 0.025 # Learning rate epochs = 200 # Number of training epochs batch_size = 16 # Batch size n_qubits = 15 # 15 qubits as per the original QNN n_layers = 2 # Number of layers model = QuantumNeuralNetwork( n_qubits=n_qubits, n_layers=n_layers, num_classes=num_classes ) if is_quantum_model(model): print("The model contains quantum neural network (QNN) layers.") else: print("The model does not contain quantum neural network (QNN) layers.") # Draw the quantum circuit sample_inputs = np.ones(4, dtype=np.float32) * (np.pi / 4) # Inputs mapped to [0, π/2] sample_encoding_weights = np.zeros((4, n_layers, 8), dtype=np.float32) sample_rot_weights = np.zeros((n_layers, n_qubits, 3), dtype=np.float32) sample_entangle_weights = np.zeros((n_layers, n_qubits - 1), dtype=np.float32) qml.drawer.use_style('sketch') print("\nQuantum circuit diagram:") print(qml.draw(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights)) qml.draw_mpl(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights)[0].savefig('7型swap移位变换_不重置RY.png') steps_per_epoch = len(X_train_mapped) // batch_size total_steps = epochs * steps_per_epoch # Create dataset (including validation set) dataset = InMemoryDataset( X_train_mapped, y_train, X_val_mapped, y_val, categorical=True ) # Initialize reweighting policy reweighting_policy = BiasedReweightingPolicy(k=1.0) # Initialize importance sampling training wrapped_model = ConstantTimeImportanceTraining(model, score="loss") # Use loss as score # Modify sampler to use dynamic alpha and beta def custom_sampler(dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, reweighting_policy, model=wrapped_model.model, ) # Assign custom sampler to wrapped_model wrapped_model.sampler = custom_sampler # Use per-sample loss function loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE) # Compile model model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=loss_fn, metrics=['accuracy'], run_eagerly=True # Set to False for better performance ) # Create a callback class to save training metrics class MetricsHistory(tf.keras.callbacks.Callback): def on_train_begin(self, logs=None): self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []} def on_epoch_end(self, epoch, logs=None): self.history['loss'].append(logs.get('loss')) self.history['accuracy'].append(logs.get('accuracy')) self.history['val_loss'].append(logs.get('val_loss')) self.history['val_accuracy'].append(logs.get('val_accuracy')) # Instantiate callbacks metrics_history = MetricsHistory() # Add early stopping callback callbacks = [ metrics_history, EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) ] # Train model using fit_dataset history = wrapped_model.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=callbacks ) # 在训练完成后,评估测试集 print("\nEvaluating on test set:") test_loss, test_accuracy = wrapped_model.evaluate((X_test_mapped, y_test), mode='test') print('Test loss:', test_loss) print('Test accuracy:', test_accuracy) # Plot graphs using history from custom callback plot_history(metrics_history) mean_accuracy, fluctuation = calculate_accuracy_fluctuation(metrics_history) if mean_accuracy is not None and fluctuation is not None: print('Validation set mean accuracy:', mean_accuracy) print('Accuracy fluctuation:', fluctuation) # Close output file and restore stdout sys.stdout = original_stdout output_file.close() print( "Script execution completed. Please check 'output.log' for print output and check the saved plots in the current directory.")
上述神经网络经典层包含哪些内容,哪些是必要的,哪些是非必要的?详细说明
在您提供的量子神经网络(Quantum Neural Network, QNN)代码中,**经典层(Classical Layers)**主要包括以下部分:
Dense1 层
pythonself.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 )
Dense2 层
pythonself.dense2 = tf.keras.layers.Dense( 8, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 )
输出层(Output Layer)
pythonself.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 # Ensure the output layer is float32 )
经典层 | 必要性 | 说明 |
---|---|---|
Dense1 | 必要 | 负责将量子层的输出转换为适合经典网络进一步处理的形式,是经典部分与量子部分之间的桥梁。 |
Dense2 | 非必要 | 进一步提取特征,可以根据具体任务和数据集的复杂度决定是否需要。增加此层可能提升模型表现,但也增加了计算开销和过拟合风险。 |
输出层 | 必要 | 生成最终的分类概率,是模型进行预测的关键部分。 |
激活函数(ReLU 和 Softmax)
正则化(L2 Regularization)
kernel_regularizer=tf.keras.regularizers.l2(0.0001)
数据类型(dtype=tf.float32)
根据您的具体需求和数据集的复杂度,您可以对经典层进行以下优化和调整:
减少 Dense 层数量
pythonclass QuantumNeuralNetwork(tf.keras.Model): def __init__(self, n_qubits, n_layers, num_classes, **kwargs): super(QuantumNeuralNetwork, self).__init__(**kwargs) self.n_qubits = n_qubits self.n_layers = n_layers self.num_classes = num_classes # Quantum layer self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # Classical layers self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) # Output layer directly connected to quantum outputs self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 # Ensure the output layer is float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) return self.output_layer(x)
调整 Dense 层的单元数
调整正则化强度
0.00001
或移除。引入 Dropout 层
pythonself.dropout = tf.keras.layers.Dropout(0.5)
优化激活函数
在您的模型中,量子层输出经过经典 Dense 层的处理,最终通过输出层生成分类结果。经典层的设计直接影响量子层输出特征的利用效率:
在您的代码中,经典层配置如下:
python# Classical layers self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) self.dense2 = tf.keras.layers.Dense( 8, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) # Output layer directly connected to quantum outputs self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 # Ensure the output layer is float32 )
num_classes
单元,Softmax 激活,L2 正则化。建议:
评估必要性:
实验与调整:
监控过拟合:
在您的量子神经网络模型中,经典层主要由 Dense1、Dense2 和输出层组成。必要的经典层包括:
非必要的经典层包括:
此外,正则化、激活函数和数据类型的设置在经典层中也是重要的配置项,需根据具体情况进行调整。通过实验和模型评估,您可以确定哪些经典层和参数配置最适合您的量子神经网络,优化模型性能和训练效率。
import sys
import os
import pennylane as qml
from pennylane import numpy as np
import tensorflow as tf
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler, MinMaxScaler
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense
from importance_sampling.training import ConstantTimeImportanceTraining
from importance_sampling.samplers import ConstantTimeSampler
from importance_sampling.datasets import InMemoryDataset
from importance_sampling.reweighting import BiasedReweightingPolicy
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import logging
tf.keras.backend.set_floatx('float32')
class Tee(object):
def init(self, *files):
self.files = files
self.encoding = getattr(files[0], 'encoding', 'utf-8') # Add encoding attribute
textdef write(self, obj): for f in self.files: f.write(obj) f.flush() def flush(self): for f in self.files: f.flush()
output_file = open('lr=0.001_layers=2_删除dense2,dense1正则化降低.log', 'w', encoding='utf-8')
original_stdout = sys.stdout
tee = Tee(sys.stdout, output_file)
sys.stdout = tee
logger = logging.getLogger('my_sampler')
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(tee)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(stream_handler)
else:
logger.handlers = []
logger.addHandler(stream_handler)
logging.getLogger('qiskit').setLevel(logging.WARNING)
def save_plot(fig, filename):
fig.savefig(filename)
plt.close(fig)
def plot_history(history):
# Plot training and validation accuracy
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(history.history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in history.history:
ax.plot(history.history['val_accuracy'], label='Validation Accuracy')
else:
print("Validation accuracy not found in history.")
ax.set_title('Model Accuracy')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.legend(loc='upper left')
save_plot(fig, 'accuracy_plot.png')
text# Plot training and validation loss fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history.history['loss'], label='Training Loss') if 'val_loss' in history.history: ax.plot(history.history['val_loss'], label='Validation Loss') else: print("Validation loss not found in history.") ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') save_plot(fig, 'loss_plot.png')
def calculate_accuracy_fluctuation(history):
if 'val_accuracy' in history.history:
accuracies = history.history['val_accuracy']
elif 'accuracy' in history.history:
accuracies = history.history['accuracy']
else:
print("Accuracy metric not found in history.")
return None, None
mean_accuracy = np.mean(accuracies)
fluctuation = np.sum(np.abs(accuracies - mean_accuracy))
return mean_accuracy, fluctuation
dev = qml.device("default.qubit", wires=15)
def encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values):
# 第一组
qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
text# 第二组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) # 第三组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[2], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) # 第四组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[3], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) # 第五组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) qml.ctrl(qml.RY(angle_q3_values[4], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) # 第六组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) qml.ctrl(qml.RY(angle_q3_values[5], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) # 第七组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) qml.ctrl(qml.RY(angle_q3_values[6], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) # 第八组 qml.ctrl(qml.RY(angle_q2, wires=data_qubits['q2']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1]) qml.ctrl(qml.RY(angle_q3_values[7], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1])
def encode_group_second(control_qubits, data_qubits, angle_q3_values):
# 第一组
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']),
control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 0])
text# 第二组 qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 0]) # 第三组 qml.ctrl(qml.RY(angle_q3_values[2], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 1, 1]) # 第四组 qml.ctrl(qml.RY(angle_q3_values[3], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[0, 0, 1]) # 第五组 qml.ctrl(qml.RY(angle_q3_values[4], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 0]) # 第六组 qml.ctrl(qml.RY(angle_q3_values[5], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 0]) # 第七组 qml.ctrl(qml.RY(angle_q3_values[6], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 1, 1]) # 第八组 qml.ctrl(qml.RY(angle_q3_values[7], wires=data_qubits['q3']), control=[control_qubits[0], control_qubits[1], control_qubits[2]], control_values=[1, 0, 1])
def apply_controlled_swap(swap_qubit, q_swap1, q_swap2):
"""
Modular method to apply a controlled SWAP gate.
textParameters: - swap_qubit (int): Control qubit for the SWAP gate. - q_swap1 (int): First qubit to swap. - q_swap2 (int): Second qubit to swap. """ qml.Hadamard(wires=swap_qubit) qml.CSWAP(wires=[swap_qubit, q_swap1, q_swap2]) qml.Hadamard(wires=swap_qubit)
@qml.qnode(dev, interface='tf', diff_method='backprop')
def quantum_neural_network(inputs, encoding_weights, rot_weights, entangle_weights):
control_qubits = [0, 1, 2]
text# Neuron groups encoding_data = [ {'data_qubits': {'q2': 3, 'q3': 4}, 'swap_qubit': 5}, {'data_qubits': {'q2': 6, 'q3': 7}, 'swap_qubit': 8}, {'data_qubits': {'q2': 9, 'q3': 10}, 'swap_qubit': 11}, {'data_qubits': {'q2': 12, 'q3': 13}, 'swap_qubit': 14} ] # Angle encoding: inputs are mapped to [0, pi/2] angles_q2_list = inputs # inputs shape is (4,) # Encoding weights mapped to angles_q3 angles_q3_list = encoding_weights # shape: (4, n_layers, 8) # 第一层:同时编码values和weights for idx, data_set in enumerate(encoding_data): swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] angle_q2 = angles_q2_list[idx] # scalar value angle_q3_values = angles_q3_list[idx, 0, :] # shape: (8,) # 编码一个神经元 encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[0, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply parameterized entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[0, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] apply_controlled_swap(swap_qubit, data_qubits['q3'], data_qubits['q2']) # 第二层:仅编码weights for layer in range(rot_weights.shape[0] - 1): # 每层的内容 layer_second = layer + 1 # 每层完整编码特征值和权重值 for idx, data_set in enumerate(encoding_data): data_qubits = data_set['data_qubits'] angle_q3_values = angles_q3_list[idx, layer_second, :] # shape: (8,) # 编码一个神经元 encode_group_second(control_qubits, data_qubits, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[layer_second, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply parameterized entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[layer_second, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] if layer % 2 == 0: apply_controlled_swap(data_qubits['q2'], swap_qubit, data_qubits['q3']) else: apply_controlled_swap(swap_qubit, data_qubits['q2'], data_qubits['q3']) if n_layers % 2 == 1: measured_qubits = [5, 8, 11, 14] else: measured_qubits = [3, 6, 9, 12] results = [qml.expval(qml.PauliZ(qubit)) for qubit in measured_qubits] return results
class QuantumNeuralNetworkLayer(tf.keras.layers.Layer):
def init(self, n_qubits, n_layers, num_classes):
super(QuantumNeuralNetworkLayer, self).init()
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Define encoding weights with L2 regularization initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.01) self.encoding_weights = self.add_weight( shape=(4, self.n_layers, 8), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # Define rotation layer weights with L2 regularization self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # Define entangling weights with L2 regularization self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
class QuantumNeuralNetwork(tf.keras.Model):
def init(self, n_qubits, n_layers, num_classes, **kwargs):
super(QuantumNeuralNetwork, self).init(**kwargs)
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Quantum layer self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # Classical layers self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) # Output layer directly connected to quantum outputs self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 # Ensure the output layer is float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) return self.output_layer(x)
def is_quantum_model(model):
for layer in model.layers:
if isinstance(layer, QuantumNeuralNetworkLayer):
return True
elif hasattr(layer, 'layers'):
if is_quantum_model(layer):
return True
return False
if name == "main":
# Load data
iris = load_iris()
X = iris['data'].astype(np.float32) # Ensure data type is float32
y = iris['target'].reshape(-1, 1)
text# One-Hot encode labels encoder = OneHotEncoder(sparse_output=False) y_encoded = encoder.fit_transform(y).astype(np.float32) num_classes = y_encoded.shape[1] # Initialize lists to hold indices for each split train_indices = [] val_indices = [] test_indices = [] # Set random seed for reproducibility np.random.seed(42) # For each class, select the required number of samples for class_label in range(num_classes): # Get all indices for the current class class_indices = np.where(y.flatten() == class_label)[0] # Shuffle the indices np.random.shuffle(class_indices) # Select first 30 for training, next 10 for validation, next 10 for testing train_idx = class_indices[:30] val_idx = class_indices[30:40] test_idx = class_indices[40:50] # Append to the respective lists train_indices.extend(train_idx) val_indices.extend(val_idx) test_indices.extend(test_idx) # Convert lists to numpy arrays train_indices = np.array(train_indices) val_indices = np.array(val_indices) test_indices = np.array(test_indices) # Split the data using the indices x_train, y_train = X[train_indices], y_encoded[train_indices] x_val, y_val = X[val_indices], y_encoded[val_indices] x_test, y_test = X[test_indices], y_encoded[test_indices] # Standardize and normalize data (fit only on training set) scaler = StandardScaler() X_train_standardized = scaler.fit_transform(x_train).astype(np.float32) X_val_standardized = scaler.transform(x_val).astype(np.float32) X_test_standardized = scaler.transform(x_test).astype(np.float32) # Map features to [0, π/2] range min_max_scaler = MinMaxScaler(feature_range=(0, np.pi / 2)) X_train_mapped = min_max_scaler.fit_transform(X_train_standardized).astype(np.float32) X_val_mapped = min_max_scaler.transform(X_val_standardized).astype(np.float32) X_test_mapped = min_max_scaler.transform(X_test_standardized).astype(np.float32) # Hyperparameters learning_rate = 0.001 # Learning rate epochs = 200 # Number of training epochs batch_size = 16 # Batch size n_qubits = 15 # 15 qubits as per the original QNN n_layers = 2 # Number of layers model = QuantumNeuralNetwork( n_qubits=n_qubits, n_layers=n_layers, num_classes=num_classes ) if is_quantum_model(model): print("The model contains quantum neural network (QNN) layers.") else: print("The model does not contain quantum neural network (QNN) layers.") # Draw the quantum circuit sample_inputs = np.ones(4, dtype=np.float32) * (np.pi / 4) # Inputs mapped to [0, π/2] sample_encoding_weights = np.zeros((4, n_layers, 8), dtype=np.float32) sample_rot_weights = np.zeros((n_layers, n_qubits, 3), dtype=np.float32) sample_entangle_weights = np.zeros((n_layers, n_qubits - 1), dtype=np.float32) # qml.drawer.use_style('sketch') # print("\nQuantum circuit diagram:") # print(qml.draw(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, # sample_entangle_weights)) # qml.draw_mpl(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, # sample_entangle_weights)[0].savefig('7型swap移位变换_不重置RY.png') steps_per_epoch = len(X_train_mapped) // batch_size total_steps = epochs * steps_per_epoch # Create dataset (including validation set) dataset = InMemoryDataset( X_train_mapped, y_train, X_val_mapped, y_val, categorical=True ) # Initialize reweighting policy reweighting_policy = BiasedReweightingPolicy(k=1.0) # Initialize importance sampling training wrapped_model = ConstantTimeImportanceTraining(model, score="loss") # Use loss as score # Modify sampler to use dynamic alpha and beta def custom_sampler(dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, reweighting_policy, model=wrapped_model.model, ) # Assign custom sampler to wrapped_model wrapped_model.sampler = custom_sampler # Use per-sample loss function loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE) # Compile model model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=loss_fn, metrics=['accuracy'], run_eagerly=True # Set to False for better performance ) # Create a callback class to save training metrics class MetricsHistory(tf.keras.callbacks.Callback): def on_train_begin(self, logs=None): self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []} def on_epoch_end(self, epoch, logs=None): self.history['loss'].append(logs.get('loss')) self.history['accuracy'].append(logs.get('accuracy')) self.history['val_loss'].append(logs.get('val_loss')) self.history['val_accuracy'].append(logs.get('val_accuracy')) # Instantiate callbacks metrics_history = MetricsHistory() # Add early stopping callback callbacks = [ metrics_history, EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) ] # Train model using fit_dataset history = wrapped_model.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=callbacks ) # 在训练完成后,评估测试集 print("\nEvaluating on test set:") test_loss, test_accuracy = wrapped_model.evaluate((X_test_mapped, y_test), mode='test') print('Test loss:', test_loss) print('Test accuracy:', test_accuracy) # Plot graphs using history from custom callback plot_history(metrics_history) mean_accuracy, fluctuation = calculate_accuracy_fluctuation(metrics_history) if mean_accuracy is not None and fluctuation is not None: print('Validation set mean accuracy:', mean_accuracy) print('Accuracy fluctuation:', fluctuation) # Close output file and restore stdout sys.stdout = original_stdout output_file.close() print( "Script execution completed. Please check 'output.log' for print output and check the saved plots in the current directory.")
上述是实验运行脚本
class _BaseImportanceTraining(object):
def init(self, model, score="gnorm", layer=None):
"""重要性采样训练的基类。
text参数: model: 要训练的 Keras 模型。 """ # 检查模型是否兼容,并包装模型以支持重要性采样 self._check_model(model) self.original_model = model self.model = OracleWrapper( model, self.reweighting, score=score, layer=layer ) self.original_model.optimizer = self.model.optimizer signal("is.sample").connect(self._on_sample) signal("is.score").connect(self._on_scores) # 创建独立的验证损失指标 self.val_loss_metric = tf.keras.metrics.Mean(name='val_loss') # 初始化验证准确率指标 self.val_accuracy_metric = tf.keras.metrics.Accuracy(name='val_accuracy') # 初始化测试准确率指标 self.test_accuracy_metric = tf.keras.metrics.Accuracy(name='test_accuracy') # 初始化测试损失指标 self.test_loss_metric = tf.keras.metrics.Mean(name='test_loss') def _on_sample(self, event): self._latest_sample_event = event def _on_scores(self, scores): self._latest_scores = scores def _check_model(self, model): """检查模型是否使用了 Dropout 和 BatchNorm,并警告可能影响重要性计算。""" if any( isinstance(l, (tf.keras.layers.BatchNormalization, Dropout)) for l in model.layers ): sys.stderr.write(("[NOTICE]: 您正在使用 BatchNormalization 和/或 Dropout。\n" "这些层可能会影响重要性计算,建议您用 LayerNormalization 或 " "在测试模式下的 BatchNormalization 以及 L2 正则化替代它们。\n")) @property def reweighting(self): """控制重要性采样时偏置的重加权策略。""" raise NotImplementedError() def sampler(self, dataset, batch_size, steps_per_epoch, epochs): """为给定的数据集创建一个新的采样器,以使用重要性采样进行采样。""" raise NotImplementedError() def fit(self, x, y, batch_size=32, epochs=1, verbose=1, callbacks=None, validation_split=0.0, validation_data=None, steps_per_epoch=None, on_sample=None, on_scores=None): """使用给定的数据创建一个 InMemoryDataset 实例,并使用重要性采样训练模型。 参数: x: 训练数据的 Numpy 数组或 Numpy 数组列表。 y: 目标数据的 Numpy 数组。 batch_size: 每个梯度更新的样本数。 epochs: 遍历整个训练集的次数。 verbose: {0, >0},是否使用 Keras 的进度条回调。 callbacks: 在训练期间调用的 Keras 回调列表。 validation_split: [0, 1) 之间的浮点数,用于验证的数据百分比。 validation_data: tuple,(x_val, y_val),用于模型验证的数据。 steps_per_epoch: int 或 None,每个 epoch 的步数。 on_sample: 接受 sampler, idxs, w, scores 的回调函数。 on_scores: 接受 sampler 和 scores 的回调函数。 返回: 包含训练期间收集的信息的 Keras `History` 对象。 """ # 处理验证数据集 if validation_data is not None: x_train, y_train = x, y x_test, y_test = validation_data elif validation_split > 0: assert validation_split < 1, "验证集比例必须小于 1" n = int(round(validation_split * len(x))) idxs = np.arange(len(x)) np.random.shuffle(idxs) x_train, y_train = x[idxs[n:]], y[idxs[n:]] x_test, y_test = x[idxs[:n]], y[idxs[:n]] else: x_train, y_train = x, y x_test, y_test = np.empty(shape=(0, x.shape[1])), np.empty(shape=(0, y.shape[1])) # 创建数据集 dataset = InMemoryDataset( x_train, y_train, x_test, y_test, categorical=False # 不需要转换标签 ) return self.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=verbose, callbacks=callbacks, on_sample=on_sample, on_scores=on_scores ) def fit_generator(self, train, steps_per_epoch=None, batch_size=32, epochs=1, verbose=1, callbacks=None, validation_data=None, validation_steps=None, on_sample=None, on_scores=None): """创建一个 GeneratorDataset 实例,并使用重要性采样训练模型。 参数: train: 返回 (inputs, targets) 的生成器或可迭代对象。 steps_per_epoch: 每个 epoch 的梯度更新次数。 batch_size: 每个梯度更新的样本数。 epochs: 训练轮数。 verbose: {0, >0},是否使用 Keras 的进度条回调。 validation_data: 生成器、可迭代对象或 (inputs, targets) 元组。 validation_steps: 如果 validation_data 是生成器,则需要指定此参数。 on_sample: 接受 sampler, idxs, w, scores 的回调函数。 on_scores: 接受 sampler 和 scores 的回调函数。 """ def infinite_iterator(iterable): while True: yield from iterable # 处理验证数据集 if validation_data is not None: if isinstance(validation_data, (tuple, list)): test = validation_data test_len = None elif isinstance(validation_data, Iterable): test = infinite_iterator(validation_data) try: test_len = len(validation_data) * batch_size except TypeError: test_len = validation_steps * batch_size if validation_steps else None else: test = validation_data test_len = validation_steps * batch_size if validation_steps else None else: test = (np.empty(shape=(0, 1)), np.empty(shape=(0, 1))) test_len = None logging.info("未提供验证数据") # 设置训练数据和每个 epoch 的步数 if isinstance(train, Iterable): if steps_per_epoch is None: try: steps_per_epoch = len(train) except TypeError: raise ValueError("使用没有 __len__ 方法的生成器时必须设置 steps_per_epoch") train = infinite_iterator(train) elif steps_per_epoch is None: raise ValueError("steps_per_epoch 未设置") dataset = GeneratorDataset(train, test, test_len) return self.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=verbose, callbacks=callbacks, on_sample=on_sample, on_scores=on_scores ) def fit_dataset(self, dataset, steps_per_epoch=None, batch_size=32, epochs=1, verbose=1, callbacks=None, on_sample=None, on_scores=None): # 设置训练数据和每个 epoch 的步数 if isinstance(dataset.train_data, Iterable): if steps_per_epoch is None: try: steps_per_epoch = len(dataset.train_data) except TypeError: raise ValueError("使用没有 __len__ 方法的生成器时必须设置 steps_per_epoch") train_iterator = iter(dataset.train_data) else: if steps_per_epoch is None: steps_per_epoch = len(dataset.train_data) // batch_size train_iterator = None # 使用索引访问 # 创建回调列表 self.history = History() callbacks = [BaseLogger()] + (callbacks or []) + [self.history] if verbose > 0: callbacks += [ProgbarLogger(count_mode="steps")] callbacks = CallbackList(callbacks) callbacks.set_model(self.original_model) callbacks.set_params({ "epochs": epochs, "steps": steps_per_epoch, "verbose": verbose, "do_validation": len(dataset.test_data) > 0, "metrics": self.metrics_names + [ "val_" + n for n in self.metrics_names ] }) # 创建采样器 sampler = self.sampler(dataset, batch_size, steps_per_epoch, epochs) # 开始训练循环 epoch = 0 self.original_model.stop_training = False callbacks.on_train_begin() while epoch < epochs: callbacks.on_epoch_begin(epoch) self.model.model.reset_metrics() for step in range(steps_per_epoch): batch_logs = {"batch": step, "size": batch_size} callbacks.on_batch_begin(step, batch_logs) # 在这里进行重要性采样 idxs, (x_batch, y_batch), w = sampler.sample(batch_size) # 将 x_batch 和 y_batch 转换为 TensorFlow 张量 x_batch = tf.convert_to_tensor(x_batch) y_batch = tf.convert_to_tensor(y_batch) w = tf.convert_to_tensor(w, dtype=tf.float32) # 自定义训练步骤 loss_value, metrics_results, scores = self.train_batch(x_batch, y_batch, w) # 更新采样器 sampler.update(idxs, scores) # 记录指标 batch_logs['loss'] = loss_value for metric_name, metric_value in zip(self.metrics_names[1:], metrics_results): batch_logs[metric_name] = metric_value callbacks.on_batch_end(step, batch_logs) if on_scores is not None and hasattr(self, "_latest_scores"): on_scores( sampler, self._latest_scores ) if on_sample is not None: on_sample( sampler, self._latest_sample_event["idxs"], self._latest_sample_event["w"], self._latest_sample_event["predicted_scores"] ) if self.original_model.stop_training: break # 在每个 epoch 结束时进行验证集评估 epoch_logs = {} if len(dataset.test_data) > 0: val_loss, val_accuracy = self.evaluate(dataset.test_data[:], mode='validation') epoch_logs['val_loss'] = val_loss if val_accuracy is not None: epoch_logs['val_accuracy'] = val_accuracy if val_accuracy is not None: print(f" - val_accuracy: {val_accuracy:.4f}") callbacks.on_epoch_end(epoch, epoch_logs) if self.original_model.stop_training: break epoch += 1 callbacks.on_train_end() return self.history @property def metrics_names(self): def name(x): if isinstance(x, str): return x if hasattr(x, "name"): if x.name in ['categorical_accuracy', 'sparse_categorical_accuracy']: return 'accuracy' return x.name if hasattr(x, "__name__"): return x.__name__ return str(x) metrics = self.model.model.metrics or [] return ( ["loss"] + [name(m) for m in metrics] ) def train_batch(self, x, y, w): with tf.GradientTape() as tape: y_pred = self.model.model(x, training=True) per_sample_losses = self.model.model.loss(y, y_pred) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) weights = tf.cast(w, tf.keras.backend.floatx()) # 计算加权损失 weighted_losses = per_sample_losses * weights loss = tf.reduce_mean(weighted_losses) # 包含正则化损失 reg_losses = tf.reduce_sum(self.model.model.losses) reg_losses = tf.cast(reg_losses, loss.dtype) loss += reg_losses # 获取可训练变量 trainable_vars = self.model.model.trainable_variables # 计算梯度 gradients = tape.gradient(loss, trainable_vars) self.model.model.optimizer.apply_gradients(zip(gradients, trainable_vars)) # 仅更新损失指标 self.model.model.compiled_metrics.update_state(y, y_pred, sample_weight=weights) loss_metric = self.model.model.metrics[0].result().numpy() # 假设第一个指标是损失 # 计算不带权重的准确率 accuracy_metric = tf.keras.metrics.categorical_accuracy(y, y_pred) accuracy = tf.reduce_mean(accuracy_metric).numpy() return loss.numpy(), [accuracy], per_sample_losses.numpy() def evaluate(self, test_data, mode='validation'): # 获取测试数据 x_test, y_test = test_data # 确保 x_test 和 y_test 是 NumPy 数组 x_test = np.asarray(x_test) y_test = np.asarray(y_test) # 模型预测(保持为浮点类型的概率值) y_pred = self.model.model.predict(x_test, batch_size=128) # 如果 y_test 是 one-hot 编码,保持为 one-hot 编码用于损失计算 if y_test.ndim > 1 and y_test.shape[-1] > 1: y_test_for_loss = y_test else: # 如果 y_test 是类别索引,转换为 one-hot 编码 num_classes = self.model.model.output_shape[-1] y_test_for_loss = tf.one_hot(y_test, depth=num_classes) # 确保 y_test_for_loss 是浮点类型 y_test_for_loss = tf.convert_to_tensor(y_test_for_loss, dtype=y_pred.dtype) y_pred_for_loss = tf.convert_to_tensor(y_pred, dtype=y_pred.dtype) # 计算每个样本的损失 per_sample_losses = self.model.model.loss(y_test_for_loss, y_pred_for_loss) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) # 计算平均损失 loss_value = tf.reduce_mean(per_sample_losses) print("loss_value:", loss_value.numpy()) # 添加正则化损失 reg_losses = tf.reduce_sum(self.model.model.losses) loss_value += reg_losses # 根据模式更新对应的损失指标 if mode == 'validation': # 更新验证损失指标 self.val_loss_metric.update_state(loss_value) elif mode == 'test': # 更新测试损失指标 self.test_loss_metric.update_state(loss_value) else: raise ValueError(f"Unsupported mode: {mode}") # 转换 y_test 和 y_pred 为类别索引用于准确率计算 if y_test.ndim > 1 and y_test.shape[-1] > 1: y_test_classes = np.argmax(y_test, axis=-1) else: y_test_classes = y_test if y_pred.ndim > 1 and y_pred.shape[-1] > 1: y_pred_classes = np.argmax(y_pred, axis=-1) else: y_pred_classes = y_pred # 转换为 TensorFlow 张量,确保数据类型正确 y_test_classes = tf.convert_to_tensor(y_test_classes, dtype=tf.int32) y_pred_classes = tf.convert_to_tensor(y_pred_classes, dtype=tf.int32) # 根据模式更新对应的准确率指标 if mode == 'validation' and self.val_accuracy_metric: self.val_accuracy_metric.reset_states() self.val_accuracy_metric.update_state(y_test_classes, y_pred_classes) val_accuracy = self.val_accuracy_metric.result().numpy() print("val_accuracy:", val_accuracy) accuracy = val_accuracy elif mode == 'test' and self.test_accuracy_metric: self.test_accuracy_metric.reset_states() self.test_accuracy_metric.update_state(y_test_classes, y_pred_classes) test_accuracy = self.test_accuracy_metric.result().numpy() print("test_accuracy:", test_accuracy) accuracy = test_accuracy else: accuracy = None print(f"{mode}_accuracy_metric not found or not set.") # 获取损失值 loss_value_to_print = loss_value.numpy() # 打印验证或测试指标 print("Validation Metrics:" if mode == 'validation' else "Test Metrics:") print( f" - val_loss: {loss_value_to_print:.4f}" if mode == 'validation' else f" - test_loss: {loss_value_to_print:.4f}") if accuracy is not None: print(f" - val_accuracy: {accuracy:.4f}" if mode == 'validation' else f" - test_accuracy: {accuracy:.4f}") return loss_value_to_print, accuracy
class _UnbiasedImportanceTraining(_BaseImportanceTraining):
def init(self, model, score="gnorm", layer=None):
self._reweighting = BiasedReweightingPolicy(1.0) # 无偏
textsuper(_UnbiasedImportanceTraining, self).__init__(model, score, layer) @property def reweighting(self): return self._reweighting
class ConstantTimeImportanceTraining(_UnbiasedImportanceTraining):
"""使用常时间重要性采样训练模型。
text参数: model: 要训练的 Keras 模型。 score: {"gnorm", "loss", "full_gnorm"},用于重要性采样的重要性度量。 layer: None 或 int 或 Layer,计算 gnorm 时使用的层。 """ def __init__(self, model, backward_pass_multiplier=2.0, extra_samples=0.1, score="gnorm", layer=None): self._backward_pass_multiplier = backward_pass_multiplier self._extra_samples = extra_samples super(ConstantTimeImportanceTraining, self).__init__( model, score, layer ) def sampler(self, dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, self.reweighting, self.model, backward_time=self._backward_pass_multiplier, )
上述是training.py文件
class ModelWrapper(object):
"""ModelWrapper 的目的是包装一个神经网络,并添加一些额外的层,以生成得分、损失和样本权重,以执行重要性采样。"""
textdef _iterate_batches(self, x, y, batch_size): bs = batch_size for s in range(0, len(y), bs): if isinstance(x, (list, tuple)): xs = [xi[s:s+bs] for xi in x] else: xs = x[s:s+bs] yield xs, y[s:s+bs] def evaluate(self, x, y, batch_size=128): result = np.mean( np.vstack([ self.evaluate_batch(xi, yi) for xi, yi in self._iterate_batches(x, y, batch_size) ]), axis=0 ) signal("is.evaluation").send(result) return result def score(self, x, y, batch_size=128): bs = batch_size result = np.hstack([ self.score_batch(xi, yi).T for xi, yi in self._iterate_batches(x, y, batch_size) ]).T signal("is.score").send(result) return result def set_lr(self, lr): """设置模型的学习率。""" try: K.set_value( self.optimizer.lr, lr ) except AttributeError: try: K.set_value( self.model.optimizer.lr, lr ) except AttributeError: raise NotImplementedError() try: K.set_value( self.small.optimizer.lr, lr ) except AttributeError: pass def evaluate_batch(self, x, y): raise NotImplementedError() def score_batch(self, x, y): raise NotImplementedError() def train_batch(self, x, y, w): raise NotImplementedError()
class OracleWrapper(ModelWrapper):
def init(self, model, reweighting, score="loss", layer=None):
self.reweighting = reweighting
self.layer = None # Not applicable for QNN
self.score_method = score
self.model = model
self.optimizer = model.optimizer
textdef evaluate_batch(self, x, y): outputs = self.model.evaluate( x=x, y=y, verbose=0, return_dict=True ) loss = outputs['loss'] metrics = [outputs[m] for m in self.model.metrics_names[1:]] return [loss] + metrics # 直接使用梯度 # def score_batch(self, x, y): # y_pred = self.model(x, training=False) # if self.score_method == "gnorm": # # 计算每个样本的梯度范数 # with tf.GradientTape() as tape: # tape.watch(x) # y_pred = self.model(x, training=False) # loss = self.model.loss(y, y_pred) # gradients = tape.gradient(loss, x) # scores = tf.norm(gradients, axis=-1) # else: # # 默认使用损失作为得分 # per_sample_losses = self.model.loss(y, y_pred) # if len(per_sample_losses.shape) > 1: # per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) # scores = per_sample_losses # return scores.numpy() def score_batch(self, x, y): y_pred = self.model(x, training=False) if self.score_method == "gnorm": # Compute gradient norm upper bound gradients = y_pred - y # Suitable for softmax activation and cross-entropy loss scores = tf.norm(gradients, axis=1) else: # Use loss as score by default per_sample_losses = self.model.loss(y, y_pred) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) scores = per_sample_losses return scores.numpy() def train_batch(self, x, y, w): with tf.GradientTape() as tape: y_pred = self.model(x, training=True) per_sample_losses = self.model.loss(y, y_pred) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) scores = per_sample_losses # Use losses as importance scores weights = tf.cast(w, tf.keras.backend.floatx()) # Use default float type # Compute weighted loss weighted_losses = per_sample_losses * weights loss = tf.reduce_mean(weighted_losses) # Include regularization losses reg_losses = tf.reduce_sum(self.model.losses) loss += reg_losses # Get trainable variables trainable_vars = self.model.trainable_variables # Compute gradients gradients = tape.gradient(loss, trainable_vars) # Update model parameters self.model.optimizer.apply_gradients(zip(gradients, trainable_vars)) # Update metrics self.model.compiled_metrics.update_state(y, y_pred, sample_weight=weights) metrics = [metric.result().numpy() for metric in self.model.metrics] return loss.numpy(), metrics, scores.numpy() def __call__(self, x, training=False): return self.model(x, training=training)
def _tolist(x, acceptable_iterables=(list, tuple)):
if not isinstance(x, acceptable_iterables):
x = [x]
return x
def _get_scoring_layer(score, y_true, y_pred, loss_fn="categorical_crossentropy",
layer=None, model=None):
"""获取一个计算每个 y_true 和 y_pred 对的得分的层"""
assert score in ["loss", "gnorm", "full_gnorm", "acc"]
textif score == "loss": return LossLayer(loss_fn, name='score_layer')([ y_true, y_pred ]) elif score == "gnorm": return GradientNormLayer( layer.output, loss_fn, fast=False, name='score_layer' )([ y_true, y_pred ]) elif score == "full_gnorm": return GradientNormLayer( model.trainable_weights, loss_fn, fast=False, name='score_layer' )([ y_true, y_pred ]) elif score == "acc": return MetricLayer(categorical_accuracy, name='score_layer')([ y_true, y_pred ])
class ModelWrapperDecorator(ModelWrapper):
def init(self, model_wrapper, implemented_attributes=set()):
self.model_wrapper = model_wrapper
self.implemented_attributes = (
implemented_attributes | set(["model_wrapper"])
)
textdef __getattribute__(self, name): _getattr = object.__getattribute__ implemented_attributes = _getattr(self, "implemented_attributes") if name in implemented_attributes: return _getattr(self, name) else: model_wrapper = _getattr(self, "model_wrapper") return getattr(model_wrapper, name)
上述是model_wrappers.py文件
阅读上述代码告诉我损失值的具体流程,用代码进行表示
在您提供的代码中,损失值(Loss)的计算和更新过程涉及多个步骤,尤其是由于使用了重要性采样(Importance Sampling)和自定义训练循环。以下是对损失值具体流程的详细解释,并附带相应的代码表示,以帮助您更好地理解整个过程。
首先,加载和预处理数据,包括标准化和映射特征值到 [0, π/2]
范围。
python# 加载数据 iris = load_iris() X = iris['data'].astype(np.float32) y = iris['target'].reshape(-1, 1) # One-Hot 编码标签 encoder = OneHotEncoder(sparse_output=False) y_encoded = encoder.fit_transform(y).astype(np.float32) num_classes = y_encoded.shape[1] # 数据集划分(训练、验证、测试) # ...(省略具体划分代码) # 数据标准化 scaler = StandardScaler() X_train_standardized = scaler.fit_transform(x_train).astype(np.float32) X_val_standardized = scaler.transform(x_val).astype(np.float32) X_test_standardized = scaler.transform(x_test).astype(np.float32) # 特征映射到 [0, π/2] 范围 min_max_scaler = MinMaxScaler(feature_range=(0, np.pi / 2)) X_train_mapped = min_max_scaler.fit_transform(X_train_standardized).astype(np.float32) X_val_mapped = min_max_scaler.transform(X_val_standardized).astype(np.float32) X_test_mapped = min_max_scaler.transform(X_test_standardized).astype(np.float32)
定义量子神经网络层,负责将输入特征编码到量子电路,并生成量子态输出。
pythonclass QuantumNeuralNetworkLayer(tf.keras.layers.Layer): def __init__(self, n_qubits, n_layers, num_classes): super(QuantumNeuralNetworkLayer, self).__init__() self.n_qubits = n_qubits self.n_layers = n_layers self.num_classes = num_classes # 定义编码权重 initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.01) self.encoding_weights = self.add_weight( shape=(4, self.n_layers, 8), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # 定义旋转门权重 self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # 定义纠缠门权重 self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
定义经典神经网络层,将量子层的输出进一步处理并进行分类。
pythonclass QuantumNeuralNetwork(tf.keras.Model): def __init__(self, n_qubits, n_layers, num_classes, **kwargs): super(QuantumNeuralNetwork, self).__init__(**kwargs) self.n_qubits = n_qubits self.n_layers = n_layers self.num_classes = num_classes # 量子层 self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # 经典层 self.dense1 = tf.keras.layers.Dense( 16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) # 输出层 self.output_layer = Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) return self.output_layer(x)
使用自定义的 ConstantTimeImportanceTraining
类封装模型,以支持重要性采样训练。
python# 初始化重要性采样策略 reweighting_policy = BiasedReweightingPolicy(k=1.0) # 初始化重要性采样训练类 wrapped_model = ConstantTimeImportanceTraining(model, score="loss") # 自定义采样器 def custom_sampler(dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, reweighting_policy, model=wrapped_model.model, ) # 赋值自定义采样器 wrapped_model.sampler = custom_sampler
使用类别交叉熵损失函数,并编译模型。
python# 使用每样本损失 loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE) # 编译模型 model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=loss_fn, metrics=['accuracy'], run_eagerly=True # 为了调试,实际训练时可以设置为 False )
自定义回调函数用于记录训练指标。
pythonclass MetricsHistory(tf.keras.callbacks.Callback): def on_train_begin(self, logs=None): self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []} def on_epoch_end(self, epoch, logs=None): self.history['loss'].append(logs.get('loss')) self.history['accuracy'].append(logs.get('accuracy')) self.history['val_loss'].append(logs.get('val_loss')) self.history['val_accuracy'].append(logs.get('val_accuracy'))
使用 fit_dataset
方法进行训练,结合重要性采样。
python# 实例化回调 metrics_history = MetricsHistory() # 添加早停回调 callbacks = [ metrics_history, EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) ] # 开始训练 history = wrapped_model.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=callbacks )
以下是损失值计算和更新的具体流程,基于您提供的 ConstantTimeImportanceTraining
和 OracleWrapper
类。
ConstantTimeImportanceTraining.fit_dataset
方法该方法负责整个训练循环,包括采样、损失计算、梯度更新等。
pythondef fit_dataset(self, dataset, steps_per_epoch=None, batch_size=32, epochs=1, verbose=1, callbacks=None, on_sample=None, on_scores=None): # ...(省略部分代码) # 创建采样器 sampler = self.sampler(dataset, batch_size, steps_per_epoch, epochs) # 开始训练循环 epoch = 0 self.original_model.stop_training = False callbacks.on_train_begin() while epoch < epochs: callbacks.on_epoch_begin(epoch) self.model.model.reset_metrics() for step in range(steps_per_epoch): batch_logs = {"batch": step, "size": batch_size} callbacks.on_batch_begin(step, batch_logs) # 在这里进行重要性采样 idxs, (x_batch, y_batch), w = sampler.sample(batch_size) # 将 x_batch 和 y_batch 转换为 TensorFlow 张量 x_batch = tf.convert_to_tensor(x_batch) y_batch = tf.convert_to_tensor(y_batch) w = tf.convert_to_tensor(w, dtype=tf.float32) # 自定义训练步骤 loss_value, metrics_results, scores = self.train_batch(x_batch, y_batch, w) # 更新采样器 sampler.update(idxs, scores) # 记录指标 batch_logs['loss'] = loss_value for metric_name, metric_value in zip(self.metrics_names[1:], metrics_results): batch_logs[metric_name] = metric_value callbacks.on_batch_end(step, batch_logs) # ...(省略部分代码) # 在每个 epoch 结束时进行验证集评估 epoch_logs = {} if len(dataset.test_data) > 0: val_loss, val_accuracy = self.evaluate(dataset.test_data[:], mode='validation') epoch_logs['val_loss'] = val_loss if val_accuracy is not None: epoch_logs['val_accuracy'] = val_accuracy if val_accuracy is not None: print(f" - val_accuracy: {val_accuracy:.4f}") callbacks.on_epoch_end(epoch, epoch_logs) if self.original_model.stop_training: break epoch += 1 callbacks.on_train_end() return self.history
OracleWrapper.train_batch
方法负责计算加权损失、梯度计算与应用、更新指标。
pythondef train_batch(self, x, y, w): with tf.GradientTape() as tape: y_pred = self.model(x, training=True) per_sample_losses = self.model.loss(y, y_pred) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) scores = per_sample_losses # 使用损失作为重要性得分 weights = tf.cast(w, tf.keras.backend.floatx()) # 转换权重 # 计算加权损失 weighted_losses = per_sample_losses * weights loss = tf.reduce_mean(weighted_losses) # 包含正则化损失 reg_losses = tf.reduce_sum(self.model.losses) loss += reg_losses # 获取可训练变量 trainable_vars = self.model.trainable_variables # 计算梯度 gradients = tape.gradient(loss, trainable_vars) self.model.optimizer.apply_gradients(zip(gradients, trainable_vars)) # 更新指标 self.model.compiled_metrics.update_state(y, y_pred, sample_weight=weights) loss_metric = self.model.metrics[0].result().numpy() # 假设第一个指标是损失 # 计算不带权重的准确率 accuracy_metric = tf.keras.metrics.categorical_accuracy(y, y_pred) accuracy = tf.reduce_mean(accuracy_metric).numpy() return loss.numpy(), [accuracy], per_sample_losses.numpy()
以下是损失值的具体计算流程,用伪代码表示:
pythondef compute_loss(x_batch, y_batch, w, model): with tf.GradientTape() as tape: # 前向传播 y_pred = model(x_batch, training=True) # 计算每个样本的损失(未缩减) per_sample_losses = model.loss(y_batch, y_pred) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) # 使用损失作为重要性得分 scores = per_sample_losses # 应用权重 weights = tf.cast(w, tf.keras.backend.floatx()) weighted_losses = per_sample_losses * weights # 计算加权损失的平均值 loss = tf.reduce_mean(weighted_losses) # 添加正则化损失 reg_losses = tf.reduce_sum(model.losses) loss += reg_losses # 计算梯度并更新参数 gradients = tape.gradient(loss, model.trainable_variables) model.optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # 更新指标 model.compiled_metrics.update_state(y_batch, y_pred, sample_weight=weights) # 获取损失和准确率 loss_value = model.metrics[0].result().numpy() accuracy = tf.reduce_mean(tf.keras.metrics.categorical_accuracy(y_batch, y_pred)).numpy() return loss_value, accuracy, per_sample_losses.numpy()
在每个 train_batch
调用后,损失值和准确率被记录并用于后续的指标更新与评估。
python# 在 fit_dataset 方法中 loss_value, metrics_results, scores = self.train_batch(x_batch, y_batch, w) sampler.update(idxs, scores) batch_logs['loss'] = loss_value for metric_name, metric_value in zip(self.metrics_names[1:], metrics_results): batch_logs[metric_name] = metric_value callbacks.on_batch_end(step, batch_logs)
训练完成后,使用测试集评估模型性能,包括计算测试损失和准确率。
python# 在主训练脚本中 test_loss, test_accuracy = wrapped_model.evaluate((X_test_mapped, y_test), mode='test') print('Test loss:', test_loss) print('Test accuracy:', test_accuracy)
evaluate
方法的具体流程如下:
pythondef evaluate(self, test_data, mode='test'): x_test, y_test = test_data # 转换为 NumPy 数组 x_test = np.asarray(x_test) y_test = np.asarray(y_test) # 模型预测 y_pred = self.model.model.predict(x_test, batch_size=128) # 处理标签 if y_test.ndim > 1 and y_test.shape[-1] > 1: y_test_for_loss = y_test else: y_test_for_loss = tf.one_hot(y_test, depth=num_classes) y_test_for_loss = tf.convert_to_tensor(y_test_for_loss, dtype=y_pred.dtype) y_pred_for_loss = tf.convert_to_tensor(y_pred, dtype=y_pred.dtype) # 计算每个样本的损失 per_sample_losses = self.model.model.loss(y_test_for_loss, y_pred_for_loss) if len(per_sample_losses.shape) > 1: per_sample_losses = tf.reduce_mean(per_sample_losses, axis=-1) # 计算平均损失 loss_value = tf.reduce_mean(per_sample_losses) # 添加正则化损失 reg_losses = tf.reduce_sum(self.model.model.losses) loss_value += reg_losses # 更新损失指标 if mode == 'validation': self.val_loss_metric.update_state(loss_value) elif mode == 'test': self.test_loss_metric.update_state(loss_value) else: raise ValueError(f"Unsupported mode: {mode}") # 转换为类别索引 if y_test.ndim > 1 and y_test.shape[-1] > 1: y_test_classes = np.argmax(y_test, axis=-1) else: y_test_classes = y_test if y_pred.ndim > 1 and y_pred.shape[-1] > 1: y_pred_classes = np.argmax(y_pred, axis=-1) else: y_pred_classes = y_pred y_test_classes = tf.convert_to_tensor(y_test_classes, dtype=tf.int32) y_pred_classes = tf.convert_to_tensor(y_pred_classes, dtype=tf.int32) # 更新准确率指标 if mode == 'validation' and self.val_accuracy_metric: self.val_accuracy_metric.reset_states() self.val_accuracy_metric.update_state(y_test_classes, y_pred_classes) val_accuracy = self.val_accuracy_metric.result().numpy() elif mode == 'test' and self.test_accuracy_metric: self.test_accuracy_metric.reset_states() self.test_accuracy_metric.update_state(y_test_classes, y_pred_classes) test_accuracy = self.test_accuracy_metric.result().numpy() else: accuracy = None return loss_value.numpy(), accuracy
损失值的具体流程如下:
y_pred
。CategoricalCrossentropy
损失函数,计算每个样本的损失值 per_sample_losses
。w
,得到加权损失 weighted_losses
。loss
。reg_losses
,得到最终的损失值。gradients
,并使用优化器更新模型参数。通过上述流程,您的模型能够在训练过程中动态调整对不同样本的关注程度,从而提高训练效率和模型性能。
以下是一个简化的代码示例,展示了损失值的计算和更新流程:
pythondef train_step(x_batch, y_batch, w, model): with tf.GradientTape() as tape: # 前向传播 y_pred = model(x_batch, training=True) # 计算每个样本的损失 per_sample_losses = tf.keras.losses.categorical_crossentropy(y_batch, y_pred, from_logits=False) # 使用损失作为重要性得分 scores = per_sample_losses # 应用权重 weights = tf.cast(w, tf.float32) weighted_losses = per_sample_losses * weights # 计算总损失 loss = tf.reduce_mean(weighted_losses) # 添加正则化损失 reg_losses = tf.reduce_sum(model.losses) loss += reg_losses # 计算梯度 gradients = tape.gradient(loss, model.trainable_variables) # 更新参数 model.optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # 更新指标 model.compiled_metrics.update_state(y_batch, y_pred, sample_weight=weights) # 获取损失和准确率 loss_value = model.metrics[0].result().numpy() accuracy = tf.reduce_mean(tf.keras.metrics.categorical_accuracy(y_batch, y_pred)).numpy() return loss_value, accuracy, per_sample_losses.numpy()
在您的训练过程中,重要性采样通过以下方式影响损失的计算:
w
进行加权。这意味着更重要的样本对总损失的贡献更大,从而驱动模型更多地关注这些样本。通过记录和绘制训练和验证的损失与准确率曲线,可以直观地监控模型的训练过程,帮助识别训练中的问题(如过拟合、欠拟合或训练不稳定)。
pythondef plot_history(history): # 绘制训练和验证准确率 fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history.history['accuracy'], label='Training Accuracy') if 'val_accuracy' in history.history: ax.plot(history.history['val_accuracy'], label='Validation Accuracy') ax.set_title('Model Accuracy') ax.set_ylabel('Accuracy') ax.set_xlabel('Epoch') ax.legend(loc='upper left') plt.savefig('accuracy_plot.png') plt.close(fig) # 绘制训练和验证损失 fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history.history['loss'], label='Training Loss') if 'val_loss' in history.history: ax.plot(history.history['val_loss'], label='Validation Loss') ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') plt.savefig('loss_plot.png') plt.close(fig)
通过以上详细的流程和代码示例,您可以清晰地理解损失值在您的量子神经网络训练中的具体计算与更新过程。重要性采样的引入进一步增强了模型对关键样本的关注能力,有助于提升整体性能。确保在训练过程中正确管理损失计算、梯度更新和指标记录,是实现稳定有效训练的关键。
如果您在实际运行中遇到任何问题或需要进一步的优化建议,请随时提出!