import pandas as pd import numpy as np from collec...

تم الإنشاء في: ١٣ مارس ٢٠٢٥

تمت الإجابة باستخدام O1 بواسطة Chat01

سؤال

import pandas as pd
import numpy as np
from collections import Counter
import random
import time
import datetime
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures
from sklearn.multioutput import MultiOutputClassifier
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import log_loss
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import xgboost as xgb
import warnings
import openpyxl
import itertools
import argparse
import os
import gc
import numpy as np # 确保导入 numpy

warnings.filterwarnings('ignore')

class AdvancedLotteryAnalyzer:
def init(self):
# 号码范围配置
self.front_range = range(1, 36) # 前区1-35
self.back_range = range(1, 13) # 后区1-12

text
# 数据列配置 self.front_columns = ['前区1', '前区2', '前区3', '前区4', '前区5'] self.back_columns = ['后区1', '后区2'] # 模型参数 self.window_size = 8 self.lstm_units = 256 self.epochs = 200 self.simulations = 50000 # 缓存系统 self.feature_cache = {} self.model_cache = {} def read_and_preprocess(self, file_path): """高级数据预处理""" try: print(f"⏳ 开始读取并预处理数据文件: {file_path}") start = time.time() # 读取原始数据 df = pd.read_excel(file_path) # 数据清洗 df = self._clean_data(df) # 特征工程 df = self._create_features(df) # 数据标准化 df = self._normalize_data(df) print(f"✅ 数据预处理完成,耗时: {time.time() - start:.2f}s") return df except Exception as e: print(f"❌ 数据预处理失败: {str(e)}") return None def _clean_data(self, df): """数据清洗""" df = df.dropna(subset=self.front_columns + self.back_columns) for col in self.front_columns + self.back_columns: df[col] = pd.to_numeric(df[col], errors='coerce') df = df[(df[self.front_columns] >= 1).all(axis=1) & (df[self.front_columns] <= 35).all(axis=1) & (df[self.back_columns] >= 1).all(axis=1) & (df[self.back_columns] <= 12).all(axis=1)] return df.reset_index(drop=True) def _create_features(self, df): """创建高级特征""" # 基础统计特征 df['前区和值'] = df[self.front_columns].sum(axis=1) df['后区和值'] = df[self.back_columns].sum(axis=1) df['前区跨度'] = df[self.front_columns].max(axis=1) - df[self.front_columns].min(axis=1) # 奇偶特征 df['前区奇数个数'] = df[self.front_columns].apply(lambda x: sum(n % 2 for n in x), axis=1) df['后区奇数个数'] = df[self.back_columns].apply(lambda x: sum(n % 2 for n in x), axis=1) # 质数特征 primes = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31} df['前区质数个数'] = df[self.front_columns].apply(lambda x: len(primes & set(x)), axis=1) # 连号特征 def consecutive_count(nums): sorted_nums = sorted(nums) return sum(1 for i in range(1, len(sorted_nums)) if sorted_nums[i] - sorted_nums[i - 1] == 1) df['前区连号数'] = df[self.front_columns].apply(consecutive_count, axis=1) # 添加滞后特征 for lag in [1, 3, 5]: for col in ['前区和值', '后区和值']: df[f'{col}_lag{lag}'] = df[col].shift(lag) return df.dropna() def _normalize_data(self, df): """数据标准化""" scaler = MinMaxScaler() num_cols = [ '前区和值', '后区和值', '前区跨度', '前区和值_lag1', '前区和值_lag3', '前区和值_lag5', '后区和值_lag1', '后区和值_lag3', '后区和值_lag5' ] df[num_cols] = scaler.fit_transform(df[num_cols]) return df def build_hybrid_model(self, df): """构建混合预测模型""" print("\n🔥 正在构建混合预测模型...") # 特征工程 X, y = self._prepare_3d_features(df) # 模型构建 model = Sequential() model.add(Bidirectional(LSTM(self.lstm_units, return_sequences=True), input_shape=(X.shape[1], X.shape[2]))) model.add(Dropout(0.3)) model.add(LSTM(self.lstm_units)) model.add(Dropout(0.3)) model.add(Dense(128, activation='relu')) model.add(Dense(y.shape[1], activation='sigmoid')) # 模型编译 optimizer = tf.keras.optimizers.Adam(learning_rate=0.001) model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy']) # 模型训练 early_stop = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6) history = model.fit( X, y, epochs=self.epochs, batch_size=64, validation_split=0.2, verbose=1, callbacks=[early_stop, reduce_lr] ) return model def _prepare_3d_features(self, df): """准备三维特征数据""" X, y = [], [] data = df[self.front_columns + self.back_columns].values for i in range(self.window_size, len(data)): # 时间窗口特征 window = data[i - self.window_size:i] # 统计特征 stats = df.iloc[i][['前区和值', '后区和值', '前区跨度', '前区奇数个数', '后区奇数个数', '前区质数个数', '前区连号数']].values # 组合特征 combined = np.concatenate([window.flatten(), stats]) X.append(combined) # 目标编码 target = self._encode_target(data[i]) y.append(target) # 转换为三维数组(样本数, 时间步长, 特征数) X = np.array(X).reshape(len(X), 1, -1) # 转换为伪三维数据 y = np.array(y) return X, y def _encode_target(self, numbers): """目标编码""" front = [1 if i + 1 in numbers[:5] else 0 for i in range(35)] back = [1 if i + 1 in numbers[5:] else 0 for i in range(12)] return front + back def intelligent_simulation(self, df, top_n=30): """智能蒙特卡洛模拟""" print("\n🎰 正在进行智能蒙特卡洛模拟...") # 获取热门号码 front_probs = self._get_number_probability(df, 'front') back_probs = self._get_number_probability(df, 'back') # 确保概率值有效(预防空数据或错误) if not front_probs or not back_probs: print("⚠️ 概率数据无效,使用均匀概率") front_probs = {i: 1 / 35 for i in range(1, 36)} back_probs = {i: 1 / 12 for i in range(1, 13)} # 转换键为整数确保一致性 front_probs = {int(k): v for k, v in front_probs.items()} back_probs = {int(k): v for k, v in back_probs.items()} # 生成模拟数据 simulations = [] for _ in range(self.simulations): try: # 加权随机选择 front = list(np.random.choice( list(front_probs.keys()), 5, p=list(front_probs.values()), replace=False )) back = list(np.random.choice( list(back_probs.keys()), 2, p=list(back_probs.values()), replace=False )) simulations.append(sorted(front) + sorted(back)) except ValueError as e: print(f"⚠️ 随机选择错误: {e}") # 使用简单的随机选择作为备选 front = sorted(np.random.choice(range(1, 36), 5, replace=False)) back = sorted(np.random.choice(range(1, 13), 2, replace=False)) simulations.append(list(front) + list(back)) # 筛选优质组合 return self._filter_combinations(simulations, df, top_n) def _get_number_probability(self, df, area): """计算号码出现概率""" if area == 'front': numbers = df[self.front_columns].values.flatten() total = len(numbers) return {num: count / total for num, count in Counter(numbers).most_common()} else: numbers = df[self.back_columns].values.flatten() total = len(numbers) return {num: count / total for num, count in Counter(numbers).most_common()} def _filter_combinations(self, combinations, df, top_n): """组合智能筛选""" # 计算特征匹配度 scored_combinations = [] for combo in combinations: # 确保 combo 是列表 if not isinstance(combo, list): combo = list(combo) # 前区特征 front = combo[:5] back = combo[5:] # 特征得分 score = self._calculate_combo_score(front, back, df) scored_combinations.append((combo, score)) # 按得分排序 scored_combinations.sort(key=lambda x: x[1], reverse=True) # 选择最优组合并确保返回列表类型 return [list(x[0]) for x in scored_combinations[:top_n]] def _calculate_combo_score(self, front, back, df): """组合评分系统""" score = 0 # 历史频率得分 front_counts = df[self.front_columns].apply(lambda x: len(set(x) & set(front)), axis=1) back_counts = df[self.back_columns].apply(lambda x: len(set(x) & set(back)), axis=1) score += front_counts.mean() * 0.3 + back_counts.mean() * 0.2 # 奇偶平衡得分 odd_front = sum(n % 2 for n in front) odd_back = sum(n % 2 for n in back) score += (1 - abs(odd_front / 5 - 0.5)) * 0.2 # 和值分布得分 front_sum = sum(front) back_sum = sum(back) score += (1 - abs((front_sum - 90) / 90)) * 0.1 # 假设理想和值90 score += (1 - abs((back_sum - 13) / 13)) * 0.1 # 假设理想和值13 return score def generate_recommendations(self, df): """生成智能推荐组合""" print("\n🚀 正在生成推荐号码...") # 混合模型预测 hybrid_model = self.build_hybrid_model(df) hybrid_pred = self._predict_with_model(hybrid_model, df) # 集成学习预测 ensemble_pred = self._ensemble_prediction(df) # 智能模拟组合 simulated = self.intelligent_simulation(df) # 合并结果并筛选 all_combinations = hybrid_pred + ensemble_pred + simulated return self._final_selection(all_combinations, df) def _predict_with_model(self, model, df): """使用训练好的模型进行预测""" X, _ = self._prepare_3d_features(df) last_window = X[-1:] pred = model.predict(last_window)[0] # 解码预测结果 front_probs = pred[:35] back_probs = pred[35:] front_nums = sorted(np.argsort(front_probs)[-5:]) back_nums = sorted(np.argsort(back_probs)[-2:]) return [sorted(front_nums) + sorted(back_nums)] def _ensemble_prediction(self, df): """集成学习预测""" # XGBoost预测 xgb_pred = self._xgb_predict(df) # 随机森林预测 rf_pred = self._rf_predict(df) return [xgb_pred, rf_pred] def _xgb_predict(self, df): """XGBoost预测""" X, y = self._prepare_2d_features(df) model = xgb.XGBClassifier(objective='binary:logistic', n_estimators=300) model.fit(X, y) last_window = self._get_last_window(df) pred = model.predict_proba(last_window) # 返回的是二维数组(样本数, 类别数) return self._decode_prediction(pred) def _rf_predict(self, df): """随机森林预测""" X, y = self._prepare_2d_features(df) model = RandomForestClassifier(n_estimators=500) model.fit(X, y) last_window = self._get_last_window(df) pred = model.predict_proba(last_window) # 返回的是二维数组(样本数, 类别数) return self._decode_prediction(pred) def _prepare_2d_features(self, df): """准备二维特征数据""" X, y = [], [] data = df[self.front_columns + self.back_columns].values for i in range(self.window_size, len(data)): window = data[i - self.window_size:i].flatten() stats = df.iloc[i][['前区和值', '后区和值', '前区跨度']].values X.append(np.concatenate([window, stats])) y.append(self._encode_target(data[i])) return np.array(X), np.array(y) def _get_last_window(self, df): """获取最新时间窗口数据""" data = df[self.front_columns + self.back_columns].values window = data[-self.window_size:].flatten() stats = df.iloc[-1][['前区和值', '后区和值', '前区跨度']].values return np.concatenate([window, stats]).reshape(1, -1) def _decode_prediction(self, pred): """解码预测概率""" # 确保 pred 是 numpy 数组 pred = np.array(pred) # 检查 pred 的维度和形状 if len(pred.shape) == 1: # 如果是一维,重塑为二维 pred = pred.reshape(1, -1) elif pred.shape[1] == 2: # 如果是二维但是概率矩阵格式,取第一个样本所有类别的概率 # 这适用于 predict_proba 返回的 [样本数, 类别数(通常为2)] 形式的结果 front_probs = np.zeros(35) back_probs = np.zeros(12) # 循环35个前区号码类 for i in range(35): if i < len(pred) and len(pred[i]) >= 2: front_probs[i] = pred[i][1] # 取正类概率 # 循环12个后区号码类 for i in range(35, 35 + 12): if i < len(pred) and len(pred[i]) >= 2: j = i - 35 back_probs[j] = pred[i][1] # 取正类概率 # 选择概率最高的号码 front_nums = np.argsort(front_probs)[-5:] + 1 # +1 是因为号码从1开始 back_nums = np.argsort(back_probs)[-2:] + 1 return list(sorted(front_nums)) + list(sorted(back_nums)) # 常规情况:pred 是 [1, 47] 形状的向量,包含所有号码的概率 if pred.shape[1] >= 47: # 35(前区) + 12(后区) = 47 front_probs = pred[0, :35] back_probs = pred[0, 35:47] # 选择概率最高的号码 front_nums = np.argsort(front_probs)[-5:] + 1 # +1 是因为号码从1开始 back_nums = np.argsort(back_probs)[-2:] + 1 return list(sorted(front_nums)) + list(sorted(back_nums)) else: # 如果数据格式不符合预期,返回随机选择 front_nums = sorted(np.random.choice(range(1, 36), 5, replace=False)) back_nums = sorted(np.random.choice(range(1, 13), 2, replace=False)) print("⚠️ 预测格式不符合预期,返回随机选择") return list(front_nums) + list(back_nums) def _final_selection(self, combinations, df, top=10): """最终组合筛选""" # 修复:正确的去重处理方法 unique_combinations = [] seen = set() for combo in combinations: # 首先确保 combo 是列表,然后转为元组用于哈希 if not isinstance(combo, list): combo = list(combo) # 创建一个可哈希的表示形式(元组) hashable_combo = tuple(sorted(combo[:5]) + sorted(combo[5:])) # 只添加未见过的组合 if hashable_combo not in seen: seen.add(hashable_combo) unique_combinations.append(combo) # 评分排序 scored = [(combo, self._calculate_combo_score(combo[:5], combo[5:], df)) for combo in unique_combinations] scored.sort(key=lambda x: x[1], reverse=True) return [x[0] for x in scored[:top]] def save_recommendations(self, combinations, output_file): """保存推荐结果""" print("\n💾 正在保存推荐结果...") df = pd.DataFrame(combinations, columns=self.front_columns + self.back_columns) df['生成时间'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: if os.path.exists(output_file): with pd.ExcelWriter(output_file, mode='a', engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='最新推荐') else: df.to_excel(output_file, index=False) print(f"✅ 结果已保存至: {output_file}") except Exception as e: print(f"❌ 保存失败: {str(e)}")

if name == 'main':
# 参数设置
parser = argparse.ArgumentParser(description='大乐透智能预测系统')
parser.add_argument('--input', type=str, default='Super_Lucky.xlsx', help='F:/学习文件/Super_Lucky.xlsx')
parser.add_argument('--output', type=str, default='Super_Lucky7.xlsx', help='F:/学习文件/Super_Lucky7.xlsx')
args = parser.parse_args()

text
print("🎯 大乐透智能预测系统启动") analyzer = AdvancedLotteryAnalyzer() # 数据预处理 df = analyzer.read_and_preprocess(args.input) if df is not None and len(df) > 50: # 生成推荐 recommendations = analyzer.generate_recommendations(df) # 保存结果 if recommendations: analyzer.save_recommendations(recommendations, args.output) print("\n推荐号码列表:") for i, combo in enumerate(recommendations, 1): front = ' '.join(map(str, combo[:5])) back = ' '.join(map(str, combo[5:])) print(f"组合{i:02d}: 前区 [{front}] | 后区 [{back}]") else: print("⚠️ 未生成有效推荐") else: print("❌ 数据不足,需要至少50期历史数据")帮我在我的代码基础上加入时间序列分析(ARIMA) 聚类分析(K-means) 遗传算法 集成学习 遗漏值分析 频率分析 冷热号分析 号码组合模式分析 号码走势分析 蒙特卡洛模拟 轮盘算法 回归分析 分类算法 深度学习预测 覆盖优化 集成优化推荐 数据预处理 LSTM 预测 集成学习(XGBoost和随机森林) 智能蒙特卡洛模拟 组合评分系统 目标编码 回归分析 特征组合(滑动窗口与特征合并) 模型预测(多模型集成)这些算法并深度优化提升算法的预测号码准确性,合理性,然后给我完整的代码

الفكرة

Piecing together advanced techniques

I’m working on integrating multiple advanced techniques into a cohesive code snippet. This involves combining time series analysis, clustering, genetic algorithms, ensemble learning, and data preprocessing, among other methods, to create a comprehensive advanced lottery analyzer.

Integrating advanced tactics

I’m thinking through how to weave deep optimization and various advanced analytics into a class-based "AdvancedLotteryAnalyzer" without making it unwieldy. This includes refining accuracy and maintaining efficiency.

Enhancing the toolkit

I’m improving a lottery analyzer by adding missing value interpolation, expanding frequency analysis, and implementing K-means clustering with crafted feature vectors, boosting the system's overall capabilities.

Weighing options

I’m refining the lottery analyzer by applying ARIMA for time series analysis, leveraging genetic algorithms for number selection, and optimizing coverage for better outcome patterns.

Mattifying the toolbox

I’m pulling together deep learning forecasting, regression, XGBoost, and multiple-model ensembles into a unified code snippet. Despite its size, I'll ensure it stays streamlined and efficient.

Integrating components

I’m crafting a comprehensive analytical suite by weaving together ARIMA, K-means clustering, genetic algorithms, frequency analysis, and ensemble methods. This cohesive strategy unlocks advanced insights for optimal performance.

Weighing options

I’m piecing together ARIMA with "statsmodels", K-means clustering from "sklearn", and a simple genetic algorithm for optimal coverage. These choices offer a practical balance.

Pulling together insights

I'm fine-tuning the code to include missing value analysis, trend analysis, and regression. It's shaping up to unify all references efficiently.

Reasoned about advanced lottery prediction methods for 29 seconds

الإجابة

下面给出一个示例性完整代码示例,演示如何在原有基础上引入各种算法和分析手段(ARIMA、K-Means聚类、遗传算法、集成学习、缺失值分析、频率分析、冷热号分析、号码组合模式分析、号码走势分析、蒙特卡洛模拟、轮盘算法、回归分析、分类算法、深度学习预测、覆盖优化、集成优化推荐、数据预处理、LSTM预测、集成学习(XGBoost/随机森林)、智能蒙特卡洛模拟、组合评分系统、目标编码、回归分析、特征组合(滑动窗口+特征合并)、多模型集成预测等),并尽量对这些方法做一个示例式的整合。:由于真正要在所有算法上做深度调参、融合与优化会让代码极度庞大且需要大量的实验与测试,这里仅给出一个示范版本,展示思路与整合方式,帮助你后续进一步扩展和优化。


python
# -*- coding: utf-8 -*- """ 示例:在原有 AdvancedLotteryAnalyzer 基础上,整合多种数据分析及预测算法 """ import pandas as pd import numpy as np import random import time import datetime import gc import itertools import os import argparse import warnings from collections import Counter # Scikit-learn相关 from sklearn.linear_model import LogisticRegression, LinearRegression from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures from sklearn.multioutput import MultiOutputClassifier from sklearn.model_selection import TimeSeriesSplit, train_test_split from sklearn.cluster import KMeans from sklearn.metrics import log_loss, mean_squared_error # XGBoost import xgboost as xgb # TensorFlow相关(深度学习 - LSTM) import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau # ARIMA from statsmodels.tsa.arima.model import ARIMA # 遗传算法 示例 (简单示范) # 这里使用 deap 或其他GA库也可以,这里仅做最小示例 import math # Excel处理 import openpyxl warnings.filterwarnings('ignore') class AdvancedLotteryAnalyzer: def __init__(self): # 号码范围配置 self.front_range = range(1, 36) # 前区1-35 self.back_range = range(1, 13) # 后区1-12 # 数据列配置 self.front_columns = ['前区1', '前区2', '前区3', '前区4', '前区5'] self.back_columns = ['后区1', '后区2'] # 模型/算法相关参数 self.window_size = 8 self.lstm_units = 256 self.epochs = 100 # 示例中调低一点epoch self.simulations = 30000 # 智能蒙特卡洛模拟次数 # 缓存(可根据需要使用) self.feature_cache = {} self.model_cache = {} # ========================================================================= # -------------------------- 数据预处理与分析 ------------------------------- # ========================================================================= def read_and_preprocess(self, file_path): """高级数据预处理: 包含缺失值分析、基本清洗、特征构建、标准化等""" try: print(f"\n⏳ 开始读取并预处理数据文件: {file_path}") start = time.time() # 读取原始数据 df = pd.read_excel(file_path) # 缺失值分析(简单示例) self.missing_value_analysis(df) # 数据清洗 df = self._clean_data(df) # 特征工程 df = self._create_features(df) # 数据标准化 df = self._normalize_data(df) print(f"✅ 数据预处理完成,耗时: {time.time() - start:.2f}s\n") return df except Exception as e: print(f"❌ 数据预处理失败: {str(e)}") return None def missing_value_analysis(self, df): """缺失值分析示例:可根据需要添加插值/填充策略""" missing_counts = df.isnull().sum() if missing_counts.sum() > 0: print("⚠️ 检测到缺失值情况:") print(missing_counts[missing_counts > 0]) # 简单填充/插值示例 df.fillna(method='ffill', inplace=True) # 前向填充 else: print("✅ 未检测到缺失值。") def _clean_data(self, df): """数据清洗:去除异常值、确保格式正确""" # 去除行中若干关键列为空的情况 df = df.dropna(subset=self.front_columns + self.back_columns) # 转数值类型 for col in self.front_columns + self.back_columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 过滤出合理范围内的数据 df = df[(df[self.front_columns] >= 1).all(axis=1) & (df[self.front_columns] <= 35).all(axis=1) & (df[self.back_columns] >= 1).all(axis=1) & (df[self.back_columns] <= 12).all(axis=1)] return df.reset_index(drop=True) def _create_features(self, df): """构建高级特征:可在此基础上添加更多特征""" # 基础统计特征 df['前区和值'] = df[self.front_columns].sum(axis=1) df['后区和值'] = df[self.back_columns].sum(axis=1) df['前区跨度'] = df[self.front_columns].max(axis=1) - df[self.front_columns].min(axis=1) # 奇偶特征 df['前区奇数个数'] = df[self.front_columns].apply(lambda x: sum(n % 2 for n in x), axis=1) df['后区奇数个数'] = df[self.back_columns].apply(lambda x: sum(n % 2 for n in x), axis=1) # 质数特征 primes = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31} df['前区质数个数'] = df[self.front_columns].apply(lambda x: len(primes & set(x)), axis=1) # 连号特征 def consecutive_count(nums): sorted_nums = sorted(nums) return sum(1 for i in range(1, len(sorted_nums)) if sorted_nums[i] - sorted_nums[i - 1] == 1) df['前区连号数'] = df[self.front_columns].apply(consecutive_count, axis=1) # 添加滞后特征(示例) for lag in [1, 3, 5]: for col in ['前区和值', '后区和值']: df[f'{col}_lag{lag}'] = df[col].shift(lag) # 简单趋势分析示例(滚动均值) df['前区和值_rolling_mean3'] = df['前区和值'].rolling(window=3).mean() # 去除因滞后/滚动产生的NaN df = df.dropna().reset_index(drop=True) return df def _normalize_data(self, df): """数据标准化""" scaler = MinMaxScaler() # 根据需要加入更多特征列 num_cols = [ '前区和值', '后区和值', '前区跨度', '前区奇数个数', '后区奇数个数', '前区质数个数', '前区连号数', '前区和值_lag1', '前区和值_lag3', '前区和值_lag5', '后区和值_lag1', '后区和值_lag3', '后区和值_lag5', '前区和值_rolling_mean3' ] # 注意需要判断这些列在df里是否存在 existing_cols = [c for c in num_cols if c in df.columns] df[existing_cols] = scaler.fit_transform(df[existing_cols]) return df # ========================================================================= # -------------------------- 频率分析/冷热号分析 --------------------------- # ========================================================================= def frequency_analysis(self, df): """统计前后区的出现次数,进行冷热号分析""" front_counts = Counter(df[self.front_columns].values.flatten()) back_counts = Counter(df[self.back_columns].values.flatten()) front_sorted = sorted(front_counts.items(), key=lambda x: x[1], reverse=True) back_sorted = sorted(back_counts.items(), key=lambda x: x[1], reverse=True) print("前区出现次数(热 -> 冷):", front_sorted[:10], "...") print("后区出现次数(热 -> 冷):", back_sorted[:5], "...") return front_counts, back_counts # ========================================================================= # ---------------------------- K-Means聚类分析 ----------------------------- # ========================================================================= def run_kmeans_clustering(self, df, n_clusters=5): """ 对历史数据进行简单KMeans聚类,示例:可以将每一期的号码(或特征)向量化后分簇, 看看是否有特定的组合模式分组 """ # 将每期的(前区5+后区2)共7个号码的One-Hot或者数值直接组合成特征 # 此处简单用(前区和值, 后区和值, 前区跨度, 前区连号数, 前区奇数个数等)特征进行聚类示例 feats = df[['前区和值', '后区和值', '前区跨度', '前区连号数', '前区奇数个数', '后区奇数个数']].values kmeans = KMeans(n_clusters=n_clusters, random_state=42) labels = kmeans.fit_predict(feats) df['cluster_label'] = labels print(f"\nK-Means 聚类完成,聚成 {n_clusters} 类。每类样本分布:") print(pd.Series(labels).value_counts()) return df # ========================================================================= # ----------------------------- ARIMA时间序列分析 -------------------------- # ========================================================================= def run_arima_analysis(self, df, column='前区和值', order=(2, 1, 2)): """ 使用 ARIMA 对某个特征(如前区和值)进行时间序列建模,预测下一期可能的和值范围 """ # 仅做示例:对'前区和值'做ARIMA series = df[column].values # 拟合ARIMA try: model = ARIMA(series, order=order) results = model.fit() forecast = results.forecast(steps=1) # 仅预测1步 predicted_value = forecast[0] print(f"\nARIMA({order})预测[{column}]的下一期值: {predicted_value:.2f}") return predicted_value except Exception as e: print(f"ARIMA分析失败: {e}") return None # ========================================================================= # --------------------- 目标编码、特征准备、深度学习预测 -------------------- # ========================================================================= def build_lstm_model(self, df): """构建 LSTM 模型进行预测(深度学习)""" print("\n🔥 正在构建 LSTM 深度学习模型...") X, y = self._prepare_3d_features(df) model = Sequential() model.add(Bidirectional(LSTM(self.lstm_units, return_sequences=True), input_shape=(X.shape[1], X.shape[2]))) model.add(Dropout(0.3)) model.add(LSTM(self.lstm_units)) model.add(Dropout(0.3)) model.add(Dense(128, activation='relu')) model.add(Dense(y.shape[1], activation='sigmoid')) optimizer = tf.keras.optimizers.Adam(learning_rate=0.001) model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy']) early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6) history = model.fit( X, y, epochs=self.epochs, batch_size=64, validation_split=0.2, verbose=1, callbacks=[early_stop, reduce_lr] ) return model def _prepare_3d_features(self, df): """ 准备三维特征数据(滑动窗口 + 额外特征) X.shape: (样本数, 1, window_size * 7 + 额外特征) 这里为了示例,实际可以更复杂的卷积或时序结构 """ X, y = [], [] data = df[self.front_columns + self.back_columns].values # (N, 7) for i in range(self.window_size, len(data)): # 时间窗口的号码 window = data[i - self.window_size:i] # 附加统计特征(示例) stats = df.iloc[i][[ '前区和值', '后区和值', '前区跨度', '前区奇数个数', '后区奇数个数', '前区质数个数', '前区连号数' ]].values # 组合 combined = np.concatenate([window.flatten(), stats]) X.append(combined) # 目标编码 target = self._encode_target(data[i]) y.append(target) X = np.array(X).reshape(len(X), 1, -1) # 简单reshape成(样本数, 时间步, 特征数) y = np.array(y) return X, y def _encode_target(self, numbers): """将某一期的7个号码(5前区+2后区)编码成[0/1,...]长度47的向量""" front = [1 if i + 1 in numbers[:5] else 0 for i in range(35)] back = [1 if i + 1 in numbers[5:] else 0 for i in range(12)] return front + back # ========================================================================= # ---------------------------- 回归/分类分析示例 --------------------------- # ========================================================================= def regression_analysis(self, df): """ 简单线性回归:示例用前区和值作为目标,利用其他特征去回归,看看下一期可能的前区和值 """ # 构建训练数据 reg_df = df.dropna().reset_index(drop=True) features = ['后区和值', '前区跨度', '前区奇数个数', '前区质数个数', '前区连号数'] X = reg_df[features] y = reg_df['前区和值'] if len(reg_df) > 50: X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) model = LinearRegression() model.fit(X_train, y_train) y_pred = model.predict(X_test) rmse = math.sqrt(mean_squared_error(y_test, y_pred)) print(f"\n线性回归预测前区和值,RMSE: {rmse:.2f}") # 预测下一期 last_features = df[features].iloc[[-1]] next_sum = model.predict(last_features)[0] print(f"线性回归预测下一期的前区和值 ≈ {next_sum:.2f}") return next_sum else: print("\n数据不足,无法进行回归分析。") return None def classification_analysis(self, df): """ 示例:Logistic 回归 或者 GBDT 等做简单二分类 这里演示多输出分类,输入特征 -> 是否出现某号码(0/1) """ if len(df) < self.window_size + 1: print("\n数据不足,无法进行分类分析。") return None # 构造二维特征 X, y = self._prepare_2d_features(df) model = MultiOutputClassifier(GradientBoostingClassifier(n_estimators=100)) model.fit(X, y) print("\n多输出分类(GBDT)训练完毕。") return model def _prepare_2d_features(self, df): """准备2D特征,(window_size * 7 + 若干特征) -> 目标y(多输出:47维)""" X, y = [], [] data = df[self.front_columns + self.back_columns].values for i in range(self.window_size, len(data)): window = data[i - self.window_size:i].flatten() # 可以额外加统计特征 stats = df.iloc[i][[ '前区和值', '后区和值', '前区跨度', '前区奇数个数', '后区奇数个数' ]].values X.append(np.concatenate([window, stats])) y.append(self._encode_target(data[i])) return np.array(X), np.array(y) # ========================================================================= # ---------------------------- 集成学习/多模型 ----------------------------- # ========================================================================= def ensemble_learning_prediction(self, df): """ 使用XGBoost, 随机森林, Logistic回归做Voting Ensemble示例 这里以多输出的方式做,示例简化为对47个类别分别做二分类。 """ if len(df) < self.window_size + 1: return [] X, Y = self._prepare_2d_features(df) X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42) # 训练多个基模型 xgb_clf = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss', n_estimators=100) rf_clf = RandomForestClassifier(n_estimators=100) lr_clf = LogisticRegression(max_iter=200) # 多输出包装 xgb_multi = MultiOutputClassifier(xgb_clf) rf_multi = MultiOutputClassifier(rf_clf) lr_multi = MultiOutputClassifier(lr_clf) # 逐个 fit xgb_multi.fit(X_train, Y_train) rf_multi.fit(X_train, Y_train) lr_multi.fit(X_train, Y_train) # 预测最后一行 last_sample = X[-1].reshape(1, -1) # 分别预测概率 xgb_pred = xgb_multi.predict_proba(last_sample) rf_pred = rf_multi.predict_proba(last_sample) lr_pred = lr_multi.predict_proba(last_sample) # 三者简单加权平均 # MultiOutputClassifier的predict_proba返回list(每个target一个prob),需合并 final_probs = [] for i in range(len(xgb_pred)): # xgb_pred[i], rf_pred[i], lr_pred[i] 分别是 [ [p0, p1], ] 结构 # 取正类概率 p_xgb = xgb_pred[i][0][1] p_rf = rf_pred[i][0][1] p_lr = lr_pred[i][0][1] final_probs.append((p_xgb + p_rf + p_lr) / 3) # final_probs 是47个号码的综合概率 front_probs = final_probs[:35] back_probs = final_probs[35:] front_nums = np.argsort(front_probs)[-5:] + 1 back_nums = np.argsort(back_probs)[-2:] + 1 return [list(sorted(front_nums)) + list(sorted(back_nums))] # ========================================================================= # --------------------- 智能蒙特卡洛模拟 + 轮盘算法 ------------------------ # ========================================================================= def intelligent_simulation(self, df, top_n=30): """ 智能蒙特卡洛模拟(含轮盘赌算法思想),并进行组合筛选 """ print("\n🎰 正在进行智能蒙特卡洛模拟...") # 获取号码出现概率(热号) front_probs = self._get_number_probability(df, 'front') back_probs = self._get_number_probability(df, 'back') # 若出现概率异常,则均匀化 if not front_probs or not back_probs: front_probs = {i: 1 / 35 for i in range(1, 36)} back_probs = {i: 1 / 12 for i in range(1, 13)} # 转为list以便做轮盘赌 front_items = sorted(front_probs.items(), key=lambda x: x[0]) back_items = sorted(back_probs.items(), key=lambda x: x[0]) front_numbers, front_weights = zip(*front_items) back_numbers, back_weights = zip(*back_items) front_weights = np.array(front_weights) / np.sum(front_weights) back_weights = np.array(back_weights) / np.sum(back_weights) # 蒙特卡洛模拟 simulations = [] for _ in range(self.simulations): # 轮盘赌选号码 front_selected = np.random.choice(front_numbers, size=5, replace=False, p=front_weights) back_selected = np.random.choice(back_numbers, size=2, replace=False, p=back_weights) simulations.append(sorted(front_selected) + sorted(back_selected)) # 对所有组合进行打分并筛选前 top_n return self._filter_combinations(simulations, df, top_n) def _filter_combinations(self, combinations, df, top_n): """对组合打分并选出前 top_n""" scored_combinations = [] for combo in combinations: if not isinstance(combo, list): combo = list(combo) score = self._calculate_combo_score(combo[:5], combo[5:], df) scored_combinations.append((combo, score)) scored_combinations.sort(key=lambda x: x[1], reverse=True) return [x[0] for x in scored_combinations[:top_n]] def _get_number_probability(self, df, area): """计算号码出现概率""" if area == 'front': numbers = df[self.front_columns].values.flatten() total = len(numbers) freq = Counter(numbers) else: numbers = df[self.back_columns].values.flatten() total = len(numbers) freq = Counter(numbers) if total == 0: return {} return {num: cnt / total for num, cnt in freq.items()} # ========================================================================= # ------------------------ 遗传算法示例(简单版本) ----------------------- # ========================================================================= def run_genetic_algorithm(self, df, population_size=100, generations=10): """ 简单遗传算法示例,用于优化号码组合(前5+后2), 实际可根据需要将适应度函数与选择/交叉/变异方式做更复杂的设计 """ print("\n🔬 正在运行遗传算法示例...") # 初始化种群 population = [self._random_ticket() for _ in range(population_size)] for gen in range(generations): # 计算适应度 fitness_scores = [self._calculate_combo_score(ind[:5], ind[5:], df) for ind in population] # 选择(锦标赛/轮盘/精英保留等,这里用简单排序截断) ranked = list(zip(population, fitness_scores)) ranked.sort(key=lambda x: x[1], reverse=True) elite_count = int(population_size * 0.2) # 精英保留比例 new_population = [x[0] for x in ranked[:elite_count]] # 交叉 while len(new_population) < population_size: p1 = random.choice(ranked[:50])[0] # 从前50中选择 p2 = random.choice(ranked[:50])[0] child = self._crossover(p1, p2) # 变异 if random.random() < 0.1: child = self._mutate(child) new_population.append(child) population = new_population # 最终种群中最优个体 final_fitness = [self._calculate_combo_score(ind[:5], ind[5:], df) for ind in population] best_idx = np.argmax(final_fitness) best_solution = population[best_idx] print(f"遗传算法最优组合:{best_solution},分数:{final_fitness[best_idx]:.4f}") return best_solution def _random_ticket(self): front = sorted(random.sample(range(1, 36), 5)) back = sorted(random.sample(range(1, 13), 2)) return front + back def _crossover(self, p1, p2): """简单单点交叉示例""" point = random.randint(1, 6) # 在7个位置中选一个交叉点 child = p1[:point] + p2[point:] # 为防止重复,简单去重再随机补 front = sorted(list(set(child[:5]))) while len(front) < 5: front.append(random.randint(1, 35)) front = list(set(front)) front = sorted(front[:5]) back = sorted(list(set(child[5:]))) while len(back) < 2: back.append(random.randint(1, 12)) back = list(set(back)) back = sorted(back[:2]) return front + back def _mutate(self, ticket): """变异:随机替换1~2个号码""" front = ticket[:5] back = ticket[5:] if random.random() < 0.5: # 变异前区 idx = random.randint(0, 4) front[idx] = random.randint(1, 35) front = sorted(list(set(front)))[:5] else: # 变异后区 idx = random.randint(0, 1) back[idx] = random.randint(1, 12) back = sorted(list(set(back)))[:2] return front + back # ========================================================================= # -------------------------- 组合评分/覆盖优化 ----------------------------- # ========================================================================= def _calculate_combo_score(self, front, back, df): """组合评分系统(示例):可根据多维度打分""" score = 0.0 # 1. 历史频率得分 front_counts = df[self.front_columns].apply(lambda x: len(set(x) & set(front)), axis=1) back_counts = df[self.back_columns].apply(lambda x: len(set(x) & set(back)), axis=1) freq_score = front_counts.mean() * 0.3 + back_counts.mean() * 0.2 score += freq_score # 2. 奇偶平衡得分 odd_front = sum(n % 2 for n in front) odd_back = sum(n % 2 for n in back) # 理想:前区奇偶比接近 2~3,后区奇偶比1~1 score += (1 - abs(odd_front / 5 - 0.5)) * 0.2 # 越接近2~3平衡,得分越高 score += (1 - abs(odd_back / 2 - 0.5)) * 0.1 # 3. 和值分布得分(假设合理范围) front_sum = sum(front) back_sum = sum(back) score += (1 - abs((front_sum - 90) / 90)) * 0.1 # 示例假设理想和值 ~ 90 score += (1 - abs((back_sum - 13) / 13)) * 0.1 # 示例假设理想和值 ~ 13 return score # ========================================================================= # ---------------------- 最终推荐:多模型/多分析融合 ------------------------ # ========================================================================= def generate_recommendations(self, df, top=10): """ 汇总各种模型预测/分析结果 + 蒙特卡洛模拟 + 遗传算法 + 等等 做一个融合筛选,输出最终推荐组合 """ print("\n🚀 开始融合多种算法生成推荐号码...") # 1. 深度学习LSTM预测 lstm_model = self.build_lstm_model(df) lstm_combo = self._predict_with_lstm(lstm_model, df) # 2. 集成学习 (XGBoost + RF + LR) 投票 ensemble_combo = self.ensemble_learning_prediction(df) # 3. 智能蒙特卡洛 simulated_combos = self.intelligent_simulation(df, top_n=20) # 4. 遗传算法搜索 ga_best = self.run_genetic_algorithm(df, population_size=80, generations=5) ga_combos = [ga_best] # 也可多次迭代后再收集多个 # 5. 汇总全部组合 all_combos = [lstm_combo] + ensemble_combo + simulated_combos + ga_combos # 去重 + 评分排序 final_result = self._final_selection(all_combos, df, top) return final_result def _predict_with_lstm(self, model, df): """使用训练好的LSTM模型进行下一期号码预测""" X, _ = self._prepare_3d_features(df) if len(X) == 0: # 数据不足,返回随机组合 return self._random_ticket() last_window = X[-1:] pred = model.predict(last_window)[0] # shape=(47,) front_probs = pred[:35] back_probs = pred[35:] front_nums = list(np.argsort(front_probs)[-5:] + 1) back_nums = list(np.argsort(back_probs)[-2:] + 1) return sorted(front_nums) + sorted(back_nums) def _final_selection(self, combinations, df, top=10): """ 最终筛选: 去重 -> 评分 -> 取top组合 """ unique_combinations = [] seen = set() for combo in combinations: if not isinstance(combo, list): combo = list(combo) # hashable hash_combo = tuple(sorted(combo[:5]) + sorted(combo[5:])) if hash_combo not in seen: seen.add(hash_combo) unique_combinations.append(combo) # 打分 scored = [(combo, self._calculate_combo_score(combo[:5], combo[5:], df)) for combo in unique_combinations] scored.sort(key=lambda x: x[1], reverse=True) return [list(x[0]) for x in scored[:top]] # ========================================================================= # -------------------------- 保存推荐结果到Excel -------------------------- # ========================================================================= def save_recommendations(self, combinations, output_file): """将推荐组合保存到指定Excel文件""" print("\n💾 正在保存推荐结果...") df = pd.DataFrame(combinations, columns=self.front_columns + self.back_columns) df['生成时间'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: if os.path.exists(output_file): with pd.ExcelWriter(output_file, mode='a', engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='最新推荐') else: df.to_excel(output_file, index=False) print(f"✅ 结果已保存至: {output_file}") except Exception as e: print(f"❌ 保存失败: {str(e)}") # ========================================================================= # ------------------------------ 主程序入口 --------------------------------- # ========================================================================= if __name__ == '__main__': parser = argparse.ArgumentParser(description='大乐透智能预测系统 (多算法整合示例)') parser.add_argument('--input', type=str, default='Super_Lucky.xlsx', help='输入Excel文件路径') parser.add_argument('--output', type=str, default='Super_Lucky_Recommendations.xlsx', help='输出Excel文件路径') args = parser.parse_args() print("🎯 大乐透智能预测系统(多算法整合)启动...") analyzer = AdvancedLotteryAnalyzer() # 读取 & 预处理 df = analyzer.read_and_preprocess(args.input) if df is None or len(df) < 50: print("❌ 可用数据不足,至少需要50期历史数据,程序退出。") exit() # 运行若干分析示例(可选) analyzer.frequency_analysis(df) # 频率/冷热号分析 analyzer.run_kmeans_clustering(df, 5) # K-Means聚类(示例) analyzer.run_arima_analysis(df, '前区和值', (2, 1, 2)) # ARIMA示例 analyzer.regression_analysis(df) # 回归分析(示例) analyzer.classification_analysis(df) # 分类分析(示例) # 最终推荐 recommendations = analyzer.generate_recommendations(df, top=10) if recommendations: analyzer.save_recommendations(recommendations, args.output) print("\n🎉 最终推荐号码列表:") for i, combo in enumerate(recommendations, 1): front = ' '.join(map(str, combo[:5])) back = ' '.join(map(str, combo[5:])) print(f"组合{i:02d}: 前区 [{front}] | 后区 [{back}]") else: print("⚠️ 未生成有效推荐。")

主要新增/改动说明

  1. 缺失值分析(missing_value_analysis

    • 简单示例:输出缺失情况,采用前向填充处理。
  2. K-Means聚类

    • 新增 run_kmeans_clustering 方法,将部分特征向量化,用 KMeans 做快速聚类。
    • 可视化或查看各类样本分布情况,分析是否存在某些“组合模式”聚类。
  3. ARIMA时间序列分析

    • run_arima_analysis 中对某个时间序列特征(如“前区和值”)进行 ARIMA 建模,并预测下一期可能的值。
  4. 遗传算法

    • 新增简单 run_genetic_algorithm 示例,包括种群初始化、交叉和变异。
    • 使用 _calculate_combo_score 作为适应度函数,迭代若干代后输出最优组合。
    • 这里只是一个非常简化的GA流程示例,实际可扩展交叉、变异、选择策略来增强优化效果。
  5. 回归分析/分类分析

    • regression_analysis: 用线性回归预测“前区和值”。
    • classification_analysis: 用 GBDT 做多输出分类(对应47个号码),示例如何多输出预测某号码是否出现。
  6. 集成学习(Ensemble)

    • 新增 ensemble_learning_prediction,用 XGBoost、随机森林、Logistic回归 三模型对目标进行多输出分类,然后做概率平均,得到推荐组合。
  7. 智能蒙特卡洛模拟

    • 原有逻辑基础上,增加“轮盘赌”风格的概率采样(依照号码出现概率进行加权),大量模拟后择优。
  8. 覆盖优化

    • 这里仅给出思路:可在 _calculate_combo_scorerun_genetic_algorithm 里增加覆盖率相关的评分维度,将更多历史、概率信息纳入得分或GA适应度,以“覆盖”更多可能性。
  9. 深度学习 (LSTM)

    • build_lstm_model / _predict_with_lstm 用于构建并预测下一期号码的出现概率。
  10. 特征组合(滑动窗口 + 各种统计特征)

    • _prepare_3d_features 中对过去 window_size 期的原始号码 + 当前统计特征进行合并。
  11. 最终融合

    • generate_recommendations 函数汇总了:
      1. 深度学习(LSTM)
      2. 集成学习(Voting)
      3. 智能蒙特卡洛模拟
      4. 遗传算法
      5. 最终经过 _final_selection 去重+打分排序后返回。

该示例代码较为综合且复杂,你可以根据自身需求对各部分进行裁剪、调整、深度优化。尤其在超参数(如ARIMA的(p,d,q)、K-means聚类数、遗传算法种群与代数、LSTM训练epoch、集成模型n_estimators等)做更多实验和调参,还可以在_calculate_combo_score里设计更精细的评分模型,以期获得更有针对性的预测结果。祝你好运!

مشاركة هذا السؤال والجواب