将这个代码提速 import jmp import numpy as np import panda...
बनाया गया: 25 फ़रवरी 2025
बनाया गया: 25 फ़रवरी 2025
将这个代码提速
import jmp
import numpy as np
import pandas as pd
import time
start_time = time.time()
dt = jmp.open(r"C:\0\原始0.jmp")
df = pd.DataFrame()
for idx in range(len(dt)):
df[dt[idx].name] = np.array(dt[idx])
O, H, L, C, V = 0, 1, 2, 3, 4
r = len(df)
indicators = ["知", "物", "庄1", "庄2", "叉庄", "悟", "二", "六", "候", "叁", "八"]
indicator_dict = {indicators[i]: i for i in range(len(indicators))}
result = np.zeros((r, len(indicators) + 3))
k_intermediate = len(indicators) # 悟中间列
ema_chuang1 = len(indicators) + 1 # 庄1 EMA列
ema_chuang2 = len(indicators) + 2 # 庄2 EMA列
data = df[['开', '高', '低', '收', '量']].to_numpy()
grouped = df.groupby('编码').size().reset_index(name='count')
grouped['cumulative'] = grouped['count'].cumsum()
stocks = len(grouped)
stock_ranges = grouped[['编码', 'count', 'cumulative']].to_numpy()
for stock in range(stocks):
if stock == 0:
start, end = 0, int(stock_ranges[stock, 2]) - 1
else:
start, end = int(stock_ranges[stock-1, 2]), int(stock_ranges[stock, 2]) - 1
length = end - start + 1
segment = data[start:end + 1, :]
temp_result = np.zeros((length, 12))
textfor period_idx in [0, 1]: # 0 for 12-period, 1 for 56-period period = 12 if period_idx == 0 else 56 smoothing = 6 for i in range(length): idx = start + i if i >= 1: temp_result[i, 0] = max(segment[i, H] - segment[i, L], abs(segment[i, H] - segment[i-1, C]), abs(segment[i, L] - segment[i-1, C])) if (segment[i, H] - segment[i-1, H] > 0 and segment[i, H] - segment[i-1, H] > segment[i-1, L] - segment[i, L]): temp_result[i, 6] = segment[i, H] - segment[i-1, H] if (segment[i-1, L] - segment[i, L] > 0 and segment[i-1, L] - segment[i, L] > segment[i, H] - segment[i-1, H]): temp_result[i, 7] = segment[i-1, L] - segment[i, L] if i < 5: if i == 0: temp_result[i, 4 + period_idx] = segment[i, C] else: temp_result[i, 4 + period_idx] = round(temp_result[i-1, 4 + period_idx] + (segment[i, C] - temp_result[i-1, 4 + period_idx]) * 2 / (period + 1), 3) else: temp_result[i, 4 + period_idx] = round(temp_result[i-1, 4 + period_idx] + (segment[i, C] - temp_result[i-1, 4 + period_idx]) * 2 / (period + 1), 3) # EMA计算(庄1和庄2) if i <= 4: if i == 0: result[start, ema_chuang1] = segment[start, C] # p=12 result[start, ema_chuang2] = segment[start, C] # p=56 else: prev_idx = start + i - 1 result[idx, ema_chuang1] = round(result[prev_idx, ema_chuang1] + (segment[i, C] - result[prev_idx, ema_chuang1]) * 2 / (12 + 1), 3) result[idx, ema_chuang2] = round(result[prev_idx, ema_chuang2] + (segment[i, C] - result[prev_idx, ema_chuang2]) * 2 / (56 + 1), 3) elif i > 4: prev_idx = start + i - 1 result[idx, ema_chuang1] = round(result[prev_idx, ema_chuang1] + (segment[i, C] - result[prev_idx, ema_chuang1]) * 2 / (12 + 1), 3) result[idx, ema_chuang2] = round(result[prev_idx, ema_chuang2] + (segment[i, C] - result[prev_idx, ema_chuang2]) * 2 / (56 + 1), 3) if period_idx == 1: result[idx, indicator_dict["庄1"]] = round((result[idx, ema_chuang1] - result[idx, ema_chuang2]) * 10, 2) if i >= 4: result[idx, indicator_dict["庄2"]] = round(np.mean(result[idx-4:idx+1, indicator_dict["庄1"]]), 2) if i > 0: if (result[idx-1, indicator_dict["庄1"]] < result[idx-1, indicator_dict["庄2"]] and result[idx, indicator_dict["庄1"]] >= result[idx, indicator_dict["庄2"]]): result[idx, indicator_dict["叉庄"]] = 1 elif result[idx, indicator_dict["庄1"]] >= result[idx, indicator_dict["庄2"]]: result[idx, indicator_dict["叉庄"]] = result[idx-1, indicator_dict["叉庄"]] + 1 elif (result[idx-1, indicator_dict["庄1"]] >= result[idx-1, indicator_dict["庄2"]] and result[idx, indicator_dict["庄1"]] < result[idx, indicator_dict["庄2"]]): result[idx, indicator_dict["叉庄"]] = -1 elif result[idx, indicator_dict["庄1"]] < result[idx, indicator_dict["庄2"]]: result[idx, indicator_dict["叉庄"]] = result[idx-1, indicator_dict["叉庄"]] - 1 if i >= 23: start_idx = max(0, i - 23) result[idx, indicator_dict["二"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 59: start_idx = max(0, i - 59) result[idx, indicator_dict["六"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 134: start_idx = max(0, i - 134) result[idx, indicator_dict["候"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 299: start_idx = max(0, i - 299) result[idx, indicator_dict["叁"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 479: start_idx = max(0, i - 479) result[idx, indicator_dict["八"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 35 and period_idx == 1: max_high = np.max(segment[i-35:i+1, H]) min_low = np.min(segment[i-35:i+1, L]) if max_high == min_low: result[idx, indicator_dict["悟"]] = 100 else: result[idx, indicator_dict["悟"]] = round((segment[i, C] - min_low) / (max_high - min_low + 0.00000001) * 100, 2) if idx > 0 and not np.isnan(result[idx-1, k_intermediate]) and result[idx, indicator_dict["悟"]] < 200: result[idx, k_intermediate] = round((result[idx, indicator_dict["悟"]] / 3 + 2 * result[idx-1, k_intermediate] / 3), 2) else: result[idx, k_intermediate] = result[idx, indicator_dict["悟"]] result[idx, indicator_dict["悟"]] = (result[idx, k_intermediate] - 50) * 1.5 if i >= period: temp_result[i, 1] = round(np.sum(temp_result[i-period+1:i+1, 0]), 3) temp_result[i, 8] = round(np.sum(temp_result[i-period+1:i+1, 6]) / (temp_result[i, 1] + 0.0000001) * 100, 3) temp_result[i, 9] = round(np.sum(temp_result[i-period+1:i+1, 7]) / (temp_result[i, 1] + 0.0000001) * 100, 3) temp_result[i, 10] = abs(temp_result[i, 8] - temp_result[i, 9]) / (temp_result[i, 8] + temp_result[i, 9] + 0.00000001) * 100 temp_result[i, 11] = (np.mean(temp_result[i-smoothing+1:i+1, 10]) if temp_result[i, 8] > temp_result[i, 9] else -np.mean(temp_result[i-smoothing+1:i+1, 10])) if i >= 2: factor = 1 if period_idx == 0 else 1.5 result[idx, indicator_dict["知"] if period_idx == 0 else indicator_dict["物"]] = round(np.mean(temp_result[i-2:i+1, 11]) * factor, 2)
selected_data = [] # 存储满足条件的行数据(编码和日期)
for stock in range(stocks):
if stock == 0:
start, end = 0, int(stock_ranges[stock, 2]) - 1
else:
start, end = int(stock_ranges[stock-1, 2]), int(stock_ranges[stock, 2]) - 1
text# 获取当前股票的数据 stock_data = df.iloc[start:end + 1] stock_result = result[start:end + 1, :] # 仅处理有足够数据的情况(至少134根K线以计算M120) if len(stock_data) < 134: # M120需要至少134根K线 continue # 逐行检查所有行 for i in range(len(stock_data)): if i < 50: # 确保有足够的历史数据(至少50根K线) continue # 获取当前行数据 current_row = stock_result[i] current_price = stock_data['收'].iloc[i] m120 = current_row[indicator_dict["候"]] # M120 # 获取过去50根K线和过去10根K线的知、物和候数据 history_50 = stock_result[max(0, i-49):i+1, :] # 包含当前行在内的50根K线 history_10 = stock_result[max(0, i-9):i+1, :] # 包含当前行在内的10根K线 # JSL选股条件 # 1. 过去50根K线中,M120(候)向上突破前一根K线超过45次 m120_up_cross = np.sum(history_50[1:, indicator_dict["候"]] > history_50[:-1, indicator_dict["候"]]) > 45 # 2. 收盘价与M120的相对偏差小于20%(放宽阈值) price_close_to_m120 = abs(current_price / m120 - 1) * 100 < 20 # 3. 过去50根K线中,知(常规周期ADX)小于-50至少1次 adx_know_below_50 = np.sum(history_50[:, indicator_dict["知"]] < -50) > 0 # 4. 过去10根K线中,知向上交叉物至少1次,或当前知 > 物 adx_cross_up = (current_row[indicator_dict["知"]] > current_row[indicator_dict["物"]]) or \ np.sum((history_10[:-1, indicator_dict["知"]] < history_10[:-1, indicator_dict["物"]]) & (history_10[1:, indicator_dict["知"]] >= history_10[1:, indicator_dict["物"]])) > 0 # 5. 当前知(常规周期ADX)大于0 adx_know_positive = current_row[indicator_dict["知"]] > 0 # 如果满足所有条件,记录当前行数据 if m120_up_cross and price_close_to_m120 and adx_know_below_50 and adx_cross_up and adx_know_positive: selected_data.append({ '编码': stock_data['编码'].iloc[i], '日期': stock_data['日期'].iloc[i] })
if selected_data:
# 转换为DataFrame
result_df = pd.DataFrame(selected_data)
text# 创建新的JMP表格 output_dt = jmp.DataTable("选股结果", result_df.shape[0]) for col in result_df.columns: col_type = jmp.DataType.Numeric # 假设编码和日期为数值类型 output_dt.new_column(col, col_type) output_dt[col] = list(result_df[col]) print("已成功创建JMP表格 '选股结果',包含符合条件的日期和编码。")
else:
print("没有找到符合条件的日期和编码。")
print("选中的股票编码和日期:")
for item in selected_data:
print(f"日期: {item['日期']}")
print(f"代码运行耗时: {time.time() - start_time} 秒")
下面给出一段思路示例,展示如何尽可能将原本在循环里逐行计算的指标,改为使用 Pandas 的分组(groupby)和向量化(rolling/ewm 等)来加速。请注意,由于你的原始逻辑中有一些自定义的分段计算、条件判断与循环依赖(尤其是“悟”的平滑逻辑,以及对前值的依赖判断),如果要完全和原代码一字不差地复刻结果,仍需做适当的循环或自定义函数。下面的示例代码给出的主要是核心的提速思路,供你参考、调整与改进。
shift()
、rolling(window).sum()
/rolling(window).mean()
等一系列方法来快速完成。df.groupby('编码')['收'].ewm(span=周期, adjust=False).mean()
。rolling(window).max()
、rolling(window).min()
。apply()
。如果某些部分的计算需要前一行计算结果来继续往下推(如 “悟” 指标中途判断 < 200
时才做特殊平滑处理),那么这部分逻辑无法单纯用 rolling()
、ewm()
等“无状态”的向量化方法,只能用带状态的循环或自定义累计函数(cumulative approach)。但也可以把其余能用向量化的方法先加速掉,只在必要环节做循环,大大缩短整体运行时间。
下面给出一个简化示例,它省略或简化了你原代码中的一些细节,主要演示分组 + 向量化的用法。你可以在此基础上,结合自己的实际需求(如“悟”的特殊计算、“知”“物”对两段周期的分别处理等),进一步修改。
pythonimport jmp import numpy as np import pandas as pd import time start_time = time.time() # 读取JMP表格 -> 转成 DataFrame dt = jmp.open(r"C:\0\原始0.jmp") data_dict = {} for idx in range(len(dt)): col_name = dt[idx].name data_dict[col_name] = np.array(dt[idx]) df = pd.DataFrame(data_dict) # 假设 df 中已经包含以下列名:['编码', '日期', '开', '高', '低', '收', '量'] # 确保 df 按日期升序排序(如果还没排序过的话) df = df.sort_values(by=['编码', '日期'], ascending=True) # ------------------------------------------------------------------- # 1) 一些基础列的预先计算:TR、以及高低点差等 # 这里用向量化 + shift() 来实现,每只股票分组后再计算 # ------------------------------------------------------------------- def calc_base_indicators(group): # group 是单只股票的数据子集,已经按日期排好序了 group['H_L'] = group['高'] - group['低'] group['H_pC'] = (group['高'] - group['收'].shift(1)).abs() # 高-前收 group['L_pC'] = (group['低'] - group['收'].shift(1)).abs() # 低-前收 # True Range group['TR'] = group[['H_L','H_pC','L_pC']].max(axis=1) # 计算用于 ADX 的 “上动量”、“下动量” # 注意:原代码里有对比 (H - prev_H) 和 (prev_L - L) 谁大谁小等逻辑 # 这里只做一个简单示例,需要的话可继续加 if/where 判断 group['DM_plus'] = np.where( (group['高'].diff() > 0) & (group['高'].diff() > group['低'].shift(1) - group['低']), group['高'].diff(), 0 ) group['DM_minus'] = np.where( (group['低'].shift(1) - group['低'] > 0) & (group['低'].shift(1) - group['低'] > group['高'].diff()), group['低'].shift(1) - group['低'], 0 ) return group df = df.groupby('编码', group_keys=False).apply(calc_base_indicators) # ------------------------------------------------------------------- # 2) 多周期 EMA 的计算 (庄1/庄2 之类) # 直接用 ewm(),注意要在 groupby 内部调用 # ------------------------------------------------------------------- def calc_ema_indicators(group): group = group.copy() # 避免修改原数据 group['EMA12'] = group['收'].ewm(span=12, adjust=False).mean() group['EMA56'] = group['收'].ewm(span=56, adjust=False).mean() # 庄1、庄2的示例:庄1 = (EMA12 - EMA56)*10,庄2 = 庄1 的 5 日均 group['庄1'] = (group['EMA12'] - group['EMA56'])*10 group['庄2'] = group['庄1'].rolling(window=5, min_periods=1).mean() # 也可加 center=False # 叉庄:根据庄1和庄2的交叉信息来递增递减 # 如果想精确复刻“上一条 +1/-1 累加”的写法,就要做一个循环或自定义累积函数: cross_vals = [0]*len(group) # 用来保存叉庄列 for i in range(1, len(group)): prev_庄1 = group['庄1'].iloc[i-1] prev_庄2 = group['庄2'].iloc[i-1] curr_庄1 = group['庄1'].iloc[i] curr_庄2 = group['庄2'].iloc[i] prev_cross = cross_vals[i-1] # 和你原代码的判断逻辑相同 if (prev_庄1 < prev_庄2 and curr_庄1 >= curr_庄2): cross_vals[i] = 1 elif curr_庄1 >= curr_庄2: cross_vals[i] = prev_cross + 1 elif (prev_庄1 >= prev_庄2 and curr_庄1 < curr_庄2): cross_vals[i] = -1 else: cross_vals[i] = prev_cross - 1 group['叉庄'] = cross_vals return group df = df.groupby('编码', group_keys=False).apply(calc_ema_indicators) # ------------------------------------------------------------------- # 3) 计算各种长周期均线 (二/六/候/叁/八...) # 注意 rolling(N) 里的 N 要比原代码+1,因为 Python rolling 是“包含当前行” # 原代码里比如 "if i >= 23" -> 用最近24根K线计算,所以 rolling(24) 即可。 # ------------------------------------------------------------------- def calc_ma_indicators(group): group = group.copy() group['二'] = group['收'].rolling(window=24, min_periods=1).mean() group['六'] = group['收'].rolling(window=60, min_periods=1).mean() group['候'] = group['收'].rolling(window=135, min_periods=1).mean() group['叁'] = group['收'].rolling(window=300, min_periods=1).mean() group['八'] = group['收'].rolling(window=480, min_periods=1).mean() return group df = df.groupby('编码', group_keys=False).apply(calc_ma_indicators) # ------------------------------------------------------------------- # 4) “悟” 指标:需要滚动 36 根的最高/最低;还带一个自定义平滑 # (max(36), min(36)) 很好做,但后面那句: # if (悟 < 200) 则 悟 = (悟/3 + 2*前悟/3) 的逻辑,只能再来一个循环或者自定义函数。 # ------------------------------------------------------------------- def calc_wu_indicator(group): group = group.copy() # 先计算未平滑版本 rolling_window = 36 group['max_36'] = group['高'].rolling(window=rolling_window, min_periods=1).max() group['min_36'] = group['低'].rolling(window=rolling_window, min_periods=1).min() # raw_wu = (当前收 - min_36) / (max_36 - min_36) * 100 # 防止除 0 eps = 1e-8 group['悟_raw'] = ((group['收'] - group['min_36']) / (group['max_36'] - group['min_36'] + eps) * 100) # 然后做你原代码那种“<200时平滑,否则直接赋值”的逻辑 # 再最后做 (悟 - 50)*1.5 Wu_values = [0]*len(group) for i in range(len(group)): if i == 0: # 第一条直接等于 raw Wu_values[i] = group['悟_raw'].iloc[i] else: prev_val = Wu_values[i-1] curr_raw = group['悟_raw'].iloc[i] if curr_raw < 200: Wu_values[i] = (curr_raw/3 + 2*prev_val/3) else: Wu_values[i] = curr_raw # 最终“悟”列 group['悟'] = [(val - 50)*1.5 for val in Wu_values] return group df = df.groupby('编码', group_keys=False).apply(calc_wu_indicator) # ------------------------------------------------------------------- # 5) ADX 的计算:原代码里对 12/56 两种周期都做了一些算术。 # 可以直接写成函数 + groupby.apply(),用 rolling().sum()/mean() 来代替循环 # ------------------------------------------------------------------- def calc_adx_indicators(group): group = group.copy() # 演示:对 12 周期做一次 ADX,对 56 周期做一次 ADX # 如果要复刻“知”“物”分别对应 12/56,可再分列。 # 这里只是示意:TR_N = TR过去N日累和;DM_plus_N = ...; DM_minus_N = ... # ADX = ... def adx_calc(series_tr, series_dm_plus, series_dm_minus, period): # rolling sum TR_n = series_tr.rolling(window=period).sum() DM_plus_n = series_dm_plus.rolling(window=period).sum() DM_minus_n = series_dm_minus.rolling(window=period).sum() # +DI = DM_plus_n / TR_n * 100 # -DI = DM_minus_n / TR_n * 100 +DI = DM_plus_n / (TR_n + 1e-8) * 100 -DI = DM_minus_n / (TR_n + 1e-8) * 100 # DX = abs(+DI - -DI) / (+DI + -DI) * 100 DX = ( (+DI - -DI).abs() / ((+DI + -DI) + 1e-8) ) * 100 # 再对 DX 做个“6 日均”之类的平滑 => 这里随便写一下 ADX = DX.rolling(window=6).mean() return ADX group['ADX12'] = adx_calc(group['TR'], group['DM_plus'], group['DM_minus'], 12) group['ADX56'] = adx_calc(group['TR'], group['DM_plus'], group['DM_minus'], 56) # 比如在原代码里,“知” = ADX12 的某种处理,“物”= ADX56 的某种处理 group['知'] = group['ADX12'] # 简化 group['物'] = group['ADX56'] return group df = df.groupby('编码', group_keys=False).apply(calc_adx_indicators) # ------------------------------------------------------------------- # 6) 统一筛选选股条件(JSL 选股) # 由于指令大多是“过去 N 根 K线满足...”,可以直接用 rolling # 或者 slice 来判断。这里给一个简化示例。 # ------------------------------------------------------------------- def select_stocks(df): selected = [] # 逐行判断 -> 可以向量化,但如果逻辑不多,直接 for 也没关系 # 这里演示一下如何向量化判断:例子仅供参考 # (相对原代码,这里做了简化;你可以把每一条条件转成布尔序列,再合并) # 1) 过去50根中,M120(候) 向上突破超过45次 # -> 先做一个布尔列,表示候(t) > 候(t-1) df['候_up'] = df['候'] > df['候'].shift(1) # -> 再对这个布尔列做 rolling(50).sum(),看是否>45 df['cond1'] = df.groupby('编码')['候_up'].rolling(window=50, min_periods=50).sum().values > 45 # 2) 收盘价与 M120 相对偏差 < 20% df['cond2'] = ((df['收'] / df['候'] - 1).abs() < 0.2) # 3) 过去50根 K线中,有至少1次“知 < -50” # -> 转化成“知 < -50”的布尔列,然后 rolling(50).max() # 如果 max>0,说明这段区间内有 True df['知_lt_m50'] = (df['知'] < -50) rolling_has_below = df.groupby('编码')['知_lt_m50'].rolling(window=50, min_periods=50).max().values df['cond3'] = (rolling_has_below == 1) # 4) 过去10根 K线中,知 向上交叉 物,或当前知>物 # 这里演示简化版本:判断当前知>物 或 “知从下穿到上” df['cond4'] = False mask = (df['知'] > df['物']) df['cond4'] = mask # 当前行大于 # 要判断 10 根里有交叉,可自行再写个 rolling.any() / rolling.sum() 逻辑 # 5) 当前知 > 0 df['cond5'] = (df['知'] > 0) # 最终同时满足 df['final_cond'] = df['cond1'] & df['cond2'] & df['cond3'] & df['cond4'] & df['cond5'] # 取出满足条件的 pick = df[df['final_cond']] if len(pick) > 0: selected = pick[['编码', '日期']] return selected selected_df = select_stocks(df) # 如果有结果,写回 JMP if len(selected_df) > 0: output_dt = jmp.DataTable("选股结果", selected_df.shape[0]) for col in selected_df.columns: # 如果“日期”是字符串或其他类型,可以改成 jmp.DataType.Character output_dt.new_column(col, jmp.DataType.Numeric) output_dt[col] = list(selected_df[col]) print(f"已成功创建 JMP 表格 '选股结果',共 {len(selected_df)} 行。") else: print("没有符合选股条件的数据。") print("选中的股票编码和日期:") print(selected_df) print(f"代码运行耗时: {time.time() - start_time:.2f} 秒")
分组计算
df.groupby('编码')
。rolling()
、ewm()
等向量化方法。rolling
解决。滚动窗口大小与原代码的 off-by-one 问题
if i >= 23: ...
,本质是用最近 24 根 K 线。rolling(24)
就是包含当前行在内的 24 条数据,如果不想包含当前行,可再指定 shift(1)
或在 rolling()
后再做一些移位处理。向量化提速
选择条件
for i in range(...)
。数值精度
round(..., 3)
、round(..., 2)
,在实际应用中,可以尽量少用或只在最终输出前再做一次保留小数,避免无谓的多次转换。round()
调用,会增加一些开销,也会带来更多浮点误差累积。总之,如果你的核心目的是“提速”,那就:
rolling()
/ ewm()
等函数搞定。这样通常能极大地减少 Python 解释器层面的循环,性能会有显著提升。你也可以视情况改用 NumPy 做滚动窗口(速度更快,但写法相对繁琐)或者直接用某些专门的指标库(如 talib
、pyti
等),进一步加快处理速度。祝你一切顺利!