量子神经网络比较
Creado el: 1 de enero de 2025
Creado el: 1 de enero de 2025
import sys
import os
import pennylane as qml
from pennylane import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense
from importance_sampling.training import ConstantTimeImportanceTraining
from importance_sampling.samplers import ConstantTimeSampler
from importance_sampling.datasets import InMemoryDataset
from importance_sampling.reweighting import BiasedReweightingPolicy
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import logging
from tensorflow.keras.utils import to_categorical # 导入独热编码工具
tf.keras.backend.set_floatx('float32')
class Tee(object):
def init(self, *files):
self.files = files
self.encoding = getattr(files[0], 'encoding', 'utf-8') # Add encoding attribute
textdef write(self, obj): for f in self.files: f.write(obj) f.flush() def flush(self): for f in self.files: f.flush()
output_file = open('依然使用独热编码和softmax.log', 'w', encoding='utf-8')
original_stdout = sys.stdout
tee = Tee(sys.stdout, output_file)
sys.stdout = tee
logger = logging.getLogger('my_sampler')
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(tee)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(stream_handler)
else:
logger.handlers = []
logger.addHandler(stream_handler)
logging.getLogger('qiskit').setLevel(logging.WARNING)
def save_plot(fig, filename):
fig.savefig(filename)
plt.close(fig)
def plot_history(history):
# Plot training and validation accuracy
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in history:
ax.plot(history['val_accuracy'], label='Validation Accuracy')
else:
print("Validation accuracy not found in history.")
ax.set_title('Model Accuracy')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.legend(loc='upper left')
save_plot(fig, 'accuracy_plot.png')
text# Plot training and validation loss fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history['loss'], label='Training Loss') if 'val_loss' in history: ax.plot(history['val_loss'], label='Validation Loss') else: print("Validation loss not found in history.") ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') save_plot(fig, 'loss_plot.png')
def calculate_accuracy_fluctuation(history):
if 'val_accuracy' in history:
accuracies = history['val_accuracy']
elif 'accuracy' in history:
accuracies = history['accuracy']
else:
print("Accuracy metric not found in history.")
return None, None
mean_accuracy = np.mean(accuracies)
fluctuation = np.sum(np.abs(accuracies - mean_accuracy))
return mean_accuracy, fluctuation
dev = qml.device("default.qubit", wires=7)
def encode_group(control_qubits, data_qubits, angle_q2_values, angle_q3_values):
qml.ctrl(qml.RY(angle_q2_values, wires=data_qubits['q2']), control=control_qubits, control_values=0)
qml.ctrl(qml.RY(angle_q2_values, wires=data_qubits['q2']), control=control_qubits, control_values=1)
textqml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']), control=control_qubits, control_values=0) qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=control_qubits, control_values=1)
def encode_group_second(control_qubits, data_qubits, angle_q3_values):
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']), control=control_qubits, control_values=0)
qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=control_qubits, control_values=1)
def apply_controlled_swap(swap_qubit, q_swap1, q_swap2):
"""
Modular method to apply a controlled SWAP gate.
textParameters: - swap_qubit (int): Control qubit for the SWAP gate. - q_swap1 (int): First qubit to swap. - q_swap2 (int): Second qubit to swap. """ qml.Hadamard(wires=swap_qubit) qml.CSWAP(wires=[swap_qubit, q_swap1, q_swap2]) qml.Hadamard(wires=swap_qubit)
@qml.qnode(dev, interface='tf', diff_method='backprop')
def quantum_neural_network(inputs, encoding_weights, rot_weights, entangle_weights):
control_qubits = 0
qml.Hadamard(wires=0)
# Neuron groups
encoding_data = [
{'data_qubits': {'q2': 1, 'q3': 2}, 'swap_qubit': 3},
{'data_qubits': {'q2': 4, 'q3': 5}, 'swap_qubit': 6},
]
text# Angle encoding: inputs are mapped to [0, pi/2] angles_q2_list = inputs # inputs shape is (2,) for binary classification # Encoding weights mapped to angles_q3 angles_q3_list = encoding_weights # shape: (2, n_layers, 2) # 第一层:同时编码values和weights for idx, data_set in enumerate(encoding_data): swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] angle_q2 = angles_q2_list[idx] # scalar value angle_q3_values = angles_q3_list[idx, 0, :] # shape: (2,) # 编码一个神经元 encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[0, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply CRZ entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[0, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] apply_controlled_swap(swap_qubit, data_qubits['q3'], data_qubits['q2']) # 第二层:仅编码weights for layer in range(rot_weights.shape[0] - 1): # 每层的内容 layer_second = layer + 1 # 每层完整编码特征值和权重值 for idx, data_set in enumerate(encoding_data): data_qubits = data_set['data_qubits'] angle_q3_values = angles_q3_list[idx, layer_second, :] # shape: (2,) # 编码一个神经元 encode_group_second(control_qubits, data_qubits, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[layer_second, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # Apply CRZ entanglement gates for qubit in range(rot_weights.shape[1] - 1): qml.CRZ(entangle_weights[layer_second, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] if layer % 2 == 0: apply_controlled_swap(data_qubits['q2'], swap_qubit, data_qubits['q3']) else: apply_controlled_swap(swap_qubit, data_qubits['q2'], data_qubits['q3']) # Determine measured qubits based on number of layers if rot_weights.shape[0] % 2 == 1: measured_qubits = [3, 6] else: measured_qubits = [1, 4] # Ensure that measured_qubits is not empty if not measured_qubits: raise ValueError("No qubits selected for measurement.") # Return expectation values results = [qml.expval(qml.PauliZ(qubit)) for qubit in measured_qubits] return results
class QuantumNeuralNetworkLayer(tf.keras.layers.Layer):
def init(self, n_qubits, n_layers, num_classes):
super(QuantumNeuralNetworkLayer, self).init()
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Define encoding weights with L2 regularization initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.001) # Reduced regularization self.encoding_weights = self.add_weight( shape=(2, self.n_layers, 2), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # Define rotation layer weights with L2 regularization self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # Define entangling weights with L2 regularization self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
class QuantumNeuralNetwork(tf.keras.Model):
def init(self, n_qubits, n_layers, num_classes, **kwargs):
super(QuantumNeuralNetwork, self).init(**kwargs)
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# 量子层 self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # 经典层,使用层归一化代替批归一化 self.dense1 = tf.keras.layers.Dense( 64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00001), dtype=tf.float32 ) self.layer_norm1 = tf.keras.layers.BatchNormalization() self.dense2 = tf.keras.layers.Dense( 32, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00001), dtype=tf.float32 ) self.layer_norm2 = tf.keras.layers.BatchNormalization() # 如果需要,可以取消注释以下层并相应地添加层归一化 # self.dense3 = tf.keras.layers.Dense( # 16, # activation='relu', # kernel_regularizer=tf.keras.regularizers.l2(0.001), # dtype=tf.float32 # ) # self.layer_norm3 = tf.keras.layers.LayerNormalization() # 输出层,使用 softmax 激活 self.output_layer = tf.keras.layers.Dense( num_classes, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(0.0001), dtype=tf.float32 ) def call(self, inputs, training=False): # 通过量子层 q_output = self.quantum_layer(inputs) # 第一层全连接 + 层归一化 x = self.dense1(q_output) x = self.layer_norm1(x, training=training) # 第二层全连接 + 层归一化 x = self.dense2(x) x = self.layer_norm2(x, training=training) # 如果使用第三层,可以取消注释以下代码 # x = self.dense3(x) # x = self.layer_norm3(x, training=training) # 输出层 output = self.output_layer(x) return output
def is_quantum_model(model):
for layer in model.layers:
if isinstance(layer, QuantumNeuralNetworkLayer):
return True
elif hasattr(layer, 'layers'):
if is_quantum_model(layer):
return True
return False
if name == "main":
try:
# Load data
data = np.load('plane_point_set.npz')
X = data['points'].astype(np.float32) # Original coordinates
phase_angles = data['phase_angles'].astype(np.float32) # Phase angles mapped to [0, π/2]
y = data['labels'].reshape(-1, 1)
text# 将标签转换为独热编码 y_encoded = to_categorical(y, num_classes=2).astype(np.float32) # 修改为独热编码 num_classes = 2 # 修改为2类 print("num_classes", num_classes) # Split dataset into training and testing x_full_train, x_test, phase_full_train, phase_test, y_full_train, y_test = train_test_split( X, phase_angles, y_encoded, test_size=0.2, random_state=116, stratify=y_encoded ) # Further split training set into training and validation sets x_train, x_val, phase_train, phase_val, y_train, y_val = train_test_split( x_full_train, phase_full_train, y_full_train, test_size=0.25, random_state=42, stratify=y_full_train ) # Shuffle the training data manually permutation = np.random.permutation(len(x_train)) x_train = x_train[permutation] phase_train = phase_train[permutation] y_train = y_train[permutation] # Hyperparameters learning_rate = 0.0005 # Reduced learning rate epochs = 3 # Increased number of epochs batch_size = 128 # Adjusted batch size n_qubits = 7 n_layers = 2 num_classes = 2 # Binary classification with one-hot encoding model = QuantumNeuralNetwork( n_qubits=n_qubits, n_layers=n_layers, num_classes=num_classes ) if is_quantum_model(model): print("The model contains quantum neural network (QNN) layers.") else: print("The model does not contain quantum neural network (QNN) layers.") # Draw the quantum circuit sample_inputs = np.ones(2, dtype=np.float32) * (np.pi / 4) # Inputs mapped to [0, π/2] sample_encoding_weights = np.zeros((2, n_layers, 2), dtype=np.float32) sample_rot_weights = np.zeros((n_layers, n_qubits, 3), dtype=np.float32) sample_entangle_weights = np.zeros((n_layers, n_qubits - 1), dtype=np.float32) qml.drawer.use_style('sketch') print("\nQuantum circuit diagram:") try: circuit_text = qml.draw(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights) print(circuit_text) except Exception as e: print(f"Error drawing the quantum circuit: {e}") try: fig, ax = qml.draw_mpl(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights) fig.savefig('quantum_circuit.png') except Exception as e: print(f"Error saving the quantum circuit diagram: {e}") steps_per_epoch = len(x_train) // batch_size total_steps = epochs * steps_per_epoch # Create dataset (including validation set) try: dataset = InMemoryDataset( x_train, y_train, x_val, y_val, categorical=True # 修改为True以支持独热编码 # Removed shuffle=True as it's not supported ) except TypeError as e: print(f"Error initializing InMemoryDataset: {e}") print("Attempting to initialize without 'shuffle' parameter.") dataset = InMemoryDataset( x_train, y_train, x_val, y_val, categorical=True # 保持为True ) except Exception as e: print(f"Unexpected error initializing InMemoryDataset: {e}") sys.exit(1) # Initialize reweighting policy reweighting_policy = BiasedReweightingPolicy(k=1.0) # Initialize importance sampling training wrapped_model = ConstantTimeImportanceTraining(model, score="loss") # Use loss as score # Modify sampler to use dynamic alpha and beta def custom_sampler(dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, reweighting_policy, model=wrapped_model.model, ) # Assign custom sampler to wrapped_model wrapped_model.sampler = custom_sampler # Use CategoricalCrossentropy loss function loss_fn = tf.keras.losses.CategoricalCrossentropy() # Compile model model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=loss_fn, metrics=['accuracy'], run_eagerly=True # Set to False for better performance after debugging ) # Create a callback class to save training metrics class MetricsHistory(tf.keras.callbacks.Callback): def on_train_begin(self, logs=None): self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []} def on_epoch_end(self, epoch, logs=None): self.history['loss'].append(logs.get('loss')) self.history['accuracy'].append(logs.get('accuracy')) self.history['val_loss'].append(logs.get('val_loss')) self.history['val_accuracy'].append(logs.get('val_accuracy')) # Instantiate callbacks metrics_history = MetricsHistory() # Add callbacks callbacks = [ metrics_history, EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True), ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5), ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss') ] # Train model using fit_dataset try: history = wrapped_model.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=callbacks ) except Exception as e: print(f"Error during training: {e}") # Evaluate model on test set # 将测试集标签转换为独热编码格式 y_test_encoded = to_categorical(y_test, num_classes=2).astype(np.float32) print("\nEvaluating on test set:") test_loss, test_accuracy = wrapped_model.evaluate((x_test, y_test_encoded), mode='test') print('Test loss:', test_loss) print('Test accuracy:', test_accuracy) # Plot graphs using history from custom callback plot_history(metrics_history.history) mean_accuracy, fluctuation = calculate_accuracy_fluctuation(metrics_history.history) if mean_accuracy is not None and fluctuation is not None: print('Validation set mean accuracy:', mean_accuracy) print('Accuracy fluctuation:', fluctuation) except Exception as e: print(f"An unexpected error occurred: {e}") finally: # Close output file and restore stdout sys.stdout = original_stdout output_file.close() print( "Script execution completed. Please check '不重复编码Ry-翻转swap.log' for print output and check the saved plots in the current directory.")
上述是代码1
import sys
import os
import pennylane as qml
from pennylane import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense
from importance_sampling.training import ConstantTimeImportanceTraining
from importance_sampling.samplers import ConstantTimeSampler
from importance_sampling.datasets import InMemoryDataset
from importance_sampling.reweighting import BiasedReweightingPolicy
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import logging
tf.keras.backend.set_floatx('float32')
class Tee(object):
def init(self, *files):
self.files = files
self.encoding = getattr(files[0], 'encoding', 'utf-8') # Add encoding attribute
textdef write(self, obj): for f in self.files: f.write(obj) f.flush() def flush(self): for f in self.files: f.flush()
output_file = open('缩减量子层.log', 'w', encoding='utf-8')
original_stdout = sys.stdout
tee = Tee(sys.stdout, output_file)
sys.stdout = tee
logger = logging.getLogger('my_sampler')
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(tee)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(stream_handler)
else:
logger.handlers = []
logger.addHandler(stream_handler)
logging.getLogger('qiskit').setLevel(logging.WARNING)
def save_plot(fig, filename):
fig.savefig(filename)
plt.close(fig)
def plot_history(history):
# Plot training and validation accuracy
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in history:
ax.plot(history['val_accuracy'], label='Validation Accuracy')
else:
print("Validation accuracy not found in history.")
ax.set_title('Model Accuracy')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.legend(loc='upper left')
save_plot(fig, 'accuracy_plot.png')
text# Plot training and validation loss fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(history['loss'], label='Training Loss') if 'val_loss' in history: ax.plot(history['val_loss'], label='Validation Loss') else: print("Validation loss not found in history.") ax.set_title('Model Loss') ax.set_ylabel('Loss') ax.set_xlabel('Epoch') ax.legend(loc='upper right') save_plot(fig, 'loss_plot.png')
def calculate_accuracy_fluctuation(history):
if 'val_accuracy' in history:
accuracies = history['val_accuracy']
elif 'accuracy' in history:
accuracies = history['accuracy']
else:
print("Accuracy metric not found in history.")
return None, None
mean_accuracy = np.mean(accuracies)
fluctuation = np.sum(np.abs(accuracies - mean_accuracy))
return mean_accuracy, fluctuation
dev = qml.device("default.qubit", wires=7)
def encode_group(control_qubits, data_qubits, angle_q2_values, angle_q3_values):
qml.ctrl(qml.RY(angle_q2_values, wires=data_qubits['q2']), control=control_qubits, control_values=0)
qml.ctrl(qml.RY(angle_q2_values, wires=data_qubits['q2']), control=control_qubits, control_values=1)
textqml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']), control=control_qubits, control_values=0) qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=control_qubits, control_values=1)
def encode_group_second(control_qubits, data_qubits, angle_q3_values):
qml.ctrl(qml.RY(angle_q3_values[0], wires=data_qubits['q3']), control=control_qubits, control_values=0)
qml.ctrl(qml.RY(angle_q3_values[1], wires=data_qubits['q3']), control=control_qubits, control_values=1)
def apply_controlled_swap(swap_qubit, q_swap1, q_swap2):
"""
Modular method to apply a controlled SWAP gate.
textParameters: - swap_qubit (int): Control qubit for the SWAP gate. - q_swap1 (int): First qubit to swap. - q_swap2 (int): Second qubit to swap. """ qml.Hadamard(wires=swap_qubit) qml.CSWAP(wires=[swap_qubit, q_swap1, q_swap2]) qml.Hadamard(wires=swap_qubit)
@qml.qnode(dev, interface='tf', diff_method='backprop')
def quantum_neural_network(inputs, encoding_weights, rot_weights, entangle_weights):
control_qubits = 0
qml.Hadamard(wires=0)
# Neuron groups
encoding_data = [
{'data_qubits': {'q2': 1, 'q3': 2}, 'swap_qubit': 3},
{'data_qubits': {'q2': 4, 'q3': 5}, 'swap_qubit': 6},
]
text# Angle encoding: inputs are mapped to [0, pi/2] angles_q2_list = inputs # inputs shape is (2,) for binary classification # Encoding weights mapped to angles_q3 angles_q3_list = encoding_weights # shape: (2, n_layers, 2) # 第一层:同时编码values和weights for idx, data_set in enumerate(encoding_data): swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] angle_q2 = angles_q2_list[idx] # scalar value angle_q3_values = angles_q3_list[idx, 0, :] # shape: (2,) # 编码一个神经元 encode_group(control_qubits, data_qubits, angle_q2, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[0, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # # Apply CRZ entanglement gates # for qubit in range(rot_weights.shape[1] - 1): # qml.CRZ(entangle_weights[0, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] apply_controlled_swap(swap_qubit, data_qubits['q3'], data_qubits['q2']) # 第二层:仅编码weights for layer in range(rot_weights.shape[0] - 1): # 每层的内容 layer_second = layer + 1 # 每层完整编码特征值和权重值 for idx, data_set in enumerate(encoding_data): data_qubits = data_set['data_qubits'] angle_q3_values = angles_q3_list[idx, layer_second, :] # shape: (2,) # 编码一个神经元 encode_group_second(control_qubits, data_qubits, angle_q3_values) # 每层编码旋转门和纠缠门 for qubit in range(rot_weights.shape[1]): rot_angles = rot_weights[layer_second, qubit, :] # shape: (3,) qml.Rot(rot_angles[0], rot_angles[1], rot_angles[2], wires=qubit) # # Apply CRZ entanglement gates # for qubit in range(rot_weights.shape[1] - 1): # qml.CRZ(entangle_weights[layer_second, qubit], wires=[qubit, qubit + 1]) # 每层每个神经元应用一次swap操作 for data_set in encoding_data: swap_qubit = data_set['swap_qubit'] data_qubits = data_set['data_qubits'] if layer % 2 == 0: apply_controlled_swap(data_qubits['q2'], swap_qubit, data_qubits['q3']) else: apply_controlled_swap(swap_qubit, data_qubits['q2'], data_qubits['q3']) # Determine measured qubits based on number of layers if rot_weights.shape[0] % 2 == 1: measured_qubits = [3, 6] else: measured_qubits = [1, 4] # Ensure that measured_qubits is not empty if not measured_qubits: raise ValueError("No qubits selected for measurement.") # Return expectation values results = [qml.expval(qml.PauliZ(qubit)) for qubit in measured_qubits] return results
class QuantumNeuralNetworkLayer(tf.keras.layers.Layer):
def init(self, n_qubits, n_layers, num_classes):
super(QuantumNeuralNetworkLayer, self).init()
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Define encoding weights with L2 regularization initializer = tf.keras.initializers.GlorotUniform() regularizer = tf.keras.regularizers.l2(0.001) # Reduced regularization self.encoding_weights = self.add_weight( shape=(2, self.n_layers, 2), initializer=initializer, regularizer=regularizer, trainable=True, name='encoding_weights', dtype=tf.float32 ) # Define rotation layer weights with L2 regularization self.rot_weights = self.add_weight( shape=(n_layers, n_qubits, 3), initializer=initializer, regularizer=regularizer, trainable=True, name='rot_weights', dtype=tf.float32 ) # Define entangling weights with L2 regularization self.entangle_weights = self.add_weight( shape=(n_layers, n_qubits - 1), initializer=tf.keras.initializers.RandomUniform(0, 2 * np.pi), regularizer=regularizer, trainable=True, name='entangle_weights', dtype=tf.float32 ) def call(self, inputs): outputs_list = [] for i in range(tf.shape(inputs)[0]): x = inputs[i] x = tf.cast(x, tf.float32) result = quantum_neural_network(x, self.encoding_weights, self.rot_weights, self.entangle_weights) outputs_list.append(tf.cast(result, tf.float32)) outputs = tf.stack(outputs_list, axis=0) return outputs
class QuantumNeuralNetwork(tf.keras.Model):
def init(self, n_qubits, n_layers, num_classes, **kwargs):
super(QuantumNeuralNetwork, self).init(**kwargs)
self.n_qubits = n_qubits
self.n_layers = n_layers
self.num_classes = num_classes
text# Quantum layer self.quantum_layer = QuantumNeuralNetworkLayer(n_qubits, n_layers, num_classes) # Classical layers with increased capacity and batch normalization self.dense1 = tf.keras.layers.Dense( 64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00001), dtype=tf.float32 ) # self.batch_norm1 = tf.keras.layers.BatchNormalization() self.dense2 = tf.keras.layers.Dense( 32, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00001), dtype=tf.float32 ) # self.batch_norm2 = tf.keras.layers.BatchNormalization() # self.dense3 = tf.keras.layers.Dense( # 16, # activation='relu', # kernel_regularizer=tf.keras.regularizers.l2(0.001), # dtype=tf.float32 # ) # self.batch_norm3 = tf.keras.layers.BatchNormalization() # Output layer for binary classification self.output_layer = tf.keras.layers.Dense( 1, activation='sigmoid', kernel_regularizer=tf.keras.regularizers.l2(0.001), dtype=tf.float32 ) def call(self, inputs): q_output = self.quantum_layer(inputs) x = self.dense1(q_output) # x = self.batch_norm1(x) x = self.dense2(x) # x = self.batch_norm2(x) # x = self.dense3(x) # x = self.batch_norm3(x) return self.output_layer(x)
def is_quantum_model(model):
for layer in model.layers:
if isinstance(layer, QuantumNeuralNetworkLayer):
return True
elif hasattr(layer, 'layers'):
if is_quantum_model(layer):
return True
return False
if name == "main":
try:
# Load data
data = np.load('plane_point_set.npz')
X = data['points'].astype(np.float32) # Original coordinates
phase_angles = data['phase_angles'].astype(np.float32) # Phase angles mapped to [0, π/2]
y = data['labels'].reshape(-1, 1)
text# For BinaryCrossentropy, labels should be single binary values y_encoded = y.astype(np.float32) # No need for OneHotEncoder num_classes = 1 # Binary classification print("num_classes", num_classes) # Split dataset into training and testing x_full_train, x_test, phase_full_train, phase_test, y_full_train, y_test = train_test_split( X, phase_angles, y_encoded, test_size=0.2, random_state=116, stratify=y_encoded ) # Further split training set into training and validation sets x_train, x_val, phase_train, phase_val, y_train, y_val = train_test_split( x_full_train, phase_full_train, y_full_train, test_size=0.25, random_state=42, stratify=y_full_train ) # Shuffle the training data manually permutation = np.random.permutation(len(x_train)) x_train = x_train[permutation] phase_train = phase_train[permutation] y_train = y_train[permutation] # Hyperparameters learning_rate = 0.001 # Reduced learning rate epochs = 5 # Increased number of epochs batch_size = 128 # Adjusted batch size n_qubits = 7 n_layers = 3 num_classes = 1 # Binary classification model = QuantumNeuralNetwork( n_qubits=n_qubits, n_layers=n_layers, num_classes=num_classes ) if is_quantum_model(model): print("The model contains quantum neural network (QNN) layers.") else: print("The model does not contain quantum neural network (QNN) layers.") # Draw the quantum circuit sample_inputs = np.ones(2, dtype=np.float32) * (np.pi / 4) # Inputs mapped to [0, π/2] sample_encoding_weights = np.zeros((2, n_layers, 2), dtype=np.float32) sample_rot_weights = np.zeros((n_layers, n_qubits, 3), dtype=np.float32) sample_entangle_weights = np.zeros((n_layers, n_qubits - 1), dtype=np.float32) qml.drawer.use_style('sketch') print("\nQuantum circuit diagram:") try: circuit_text = qml.draw(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights) print(circuit_text) except Exception as e: print(f"Error drawing the quantum circuit: {e}") try: fig, ax = qml.draw_mpl(quantum_neural_network)(sample_inputs, sample_encoding_weights, sample_rot_weights, sample_entangle_weights) fig.savefig('quantum_circuit.png') except Exception as e: print(f"Error saving the quantum circuit diagram: {e}") steps_per_epoch = len(x_train) // batch_size total_steps = epochs * steps_per_epoch # Create dataset (including validation set) try: dataset = InMemoryDataset( x_train, y_train, x_val, y_val, categorical=False # Changed to False for BinaryCrossentropy # Removed shuffle=True as it's not supported ) except TypeError as e: print(f"Error initializing InMemoryDataset: {e}") print("Attempting to initialize without 'shuffle' parameter.") dataset = InMemoryDataset( x_train, y_train, x_val, y_val, categorical=False ) except Exception as e: print(f"Unexpected error initializing InMemoryDataset: {e}") sys.exit(1) # Initialize reweighting policy reweighting_policy = BiasedReweightingPolicy(k=1.0) # Initialize importance sampling training wrapped_model = ConstantTimeImportanceTraining(model, score="loss") # Use loss as score # Modify sampler to use dynamic alpha and beta def custom_sampler(dataset, batch_size, steps_per_epoch, epochs): return ConstantTimeSampler( dataset, reweighting_policy, model=wrapped_model.model, ) # Assign custom sampler to wrapped_model wrapped_model.sampler = custom_sampler # Use BinaryCrossentropy loss function loss_fn = tf.keras.losses.BinaryCrossentropy() # Compile model model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=loss_fn, metrics=['accuracy'], run_eagerly=True # Set to False for better performance after debugging ) # Create a callback class to save training metrics class MetricsHistory(tf.keras.callbacks.Callback): def on_train_begin(self, logs=None): self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []} def on_epoch_end(self, epoch, logs=None): self.history['loss'].append(logs.get('loss')) self.history['accuracy'].append(logs.get('accuracy')) self.history['val_loss'].append(logs.get('val_loss')) self.history['val_accuracy'].append(logs.get('val_accuracy')) # Instantiate callbacks metrics_history = MetricsHistory() # Add callbacks callbacks = [ metrics_history, EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True), ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5), ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss') ] # Train model using fit_dataset try: history = wrapped_model.fit_dataset( dataset=dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=callbacks ) except Exception as e: print(f"Error during training: {e}") # Evaluate model on test set try: score = wrapped_model.evaluate((x_test, y_test)) print('Test loss:', score[0]) print('Test accuracy:', score[1]) except Exception as e: print(f"Error during evaluation: {e}") # Plot graphs using history from custom callback plot_history(metrics_history.history) mean_accuracy, fluctuation = calculate_accuracy_fluctuation(metrics_history.history) if mean_accuracy is not None and fluctuation is not None: print('Validation set mean accuracy:', mean_accuracy) print('Accuracy fluctuation:', fluctuation) except Exception as e: print(f"An unexpected error occurred: {e}") finally: # Close output file and restore stdout sys.stdout = original_stdout output_file.close() print( "Script execution completed. Please check '不重复编码Ry-翻转swap.log' for print output and check the saved plots in the current directory.")
上述是代码2
告诉我代码1和代码2之间在神经网络之间的区别
下面从「量子层」和「经典层(即神经网络部分)」两方面,总结代码1与代码2在神经网络结构与用法上的主要差异。
代码1(多输出 + Softmax + CategoricalCrossentropy)
self.output_layer = tf.keras.layers.Dense(num_classes, activation='softmax', ...)
,num_classes = 2
loss_fn = tf.keras.losses.CategoricalCrossentropy()
代码2(单输出 + Sigmoid + BinaryCrossentropy)
self.output_layer = tf.keras.layers.Dense(1, activation='sigmoid', ...)
,输出 0~1 之间的概率loss_fn = tf.keras.losses.BinaryCrossentropy()
因此,代码1 的分类模式是“softmax+独热编码+交叉熵(多分类形式)”,而 代码2 则是“sigmoid+单输出+交叉熵(二分类形式)”。
代码1
qml.CRZ(entangle_weights[0, qubit], wires=[qubit, qubit + 1])
等纠缠操作(在第一层和后续层都调用了 CRZ)qml.Rot(...)
+ CRZ 纠缠门,并且在循环中执行相应的 controlled-SWAP代码2
qml.Rot(...)
n_layers = 3
,但真正的纠缠操作被注释掉,实际变得更“轻量”或“缩减”)所以,代码2 的量子网络更“简化”或“缩减”,在量子态的纠缠程度上不如代码1那么复杂。
代码1(经典部分)
代码2(经典部分)
因此,代码2 的经典网络层数、正则化方式(BatchNormalization 等)与 代码1 不同,而且输出层结构也简化为单神经元输出。
y
转换为 one-hot,使用 to_categorical(..., num_classes=2)
,并且 num_classes=2
y
直接保留为 0 或 1 的 float 值,不做 one-hot这几处即是代码1 与代码2 在神经网络(包括量子部分与经典部分)最主要的区别所在。