将这个代码提速 import jmp import numpy as np import panda...

تم الإنشاء في: ٢٥ فبراير ٢٠٢٥

تمت الإجابة باستخدام O1 بواسطة Chat01

سؤال

将这个代码提速
import jmp
import numpy as np
import pandas as pd
import time

记录起始时间

start_time = time.time()

读取JMP表格

dt = jmp.open(r"C:\0\原始0.jmp")
df = pd.DataFrame()
for idx in range(len(dt)):
df[dt[idx].name] = np.array(dt[idx])

数据列索引

O, H, L, C, V = 0, 1, 2, 3, 4
r = len(df)

定义技术指标名称(与JSL一致)

indicators = ["知", "物", "庄1", "庄2", "叉庄", "悟", "二", "六", "候", "叁", "八"]
indicator_dict = {indicators[i]: i for i in range(len(indicators))}

初始化结果矩阵,增加3列:悟中间列、庄1 EMA、庄2 EMA

result = np.zeros((r, len(indicators) + 3))
k_intermediate = len(indicators) # 悟中间列
ema_chuang1 = len(indicators) + 1 # 庄1 EMA列
ema_chuang2 = len(indicators) + 2 # 庄2 EMA列

获取主要数据矩阵

data = df[['开', '高', '低', '收', '量']].to_numpy()

分组统计

grouped = df.groupby('编码').size().reset_index(name='count')
grouped['cumulative'] = grouped['count'].cumsum()
stocks = len(grouped)
stock_ranges = grouped[['编码', 'count', 'cumulative']].to_numpy()

计算技术指标(复用原有逻辑)

for stock in range(stocks):
if stock == 0:
start, end = 0, int(stock_ranges[stock, 2]) - 1
else:
start, end = int(stock_ranges[stock-1, 2]), int(stock_ranges[stock, 2]) - 1
length = end - start + 1
segment = data[start:end + 1, :]
temp_result = np.zeros((length, 12))

text
for period_idx in [0, 1]: # 0 for 12-period, 1 for 56-period period = 12 if period_idx == 0 else 56 smoothing = 6 for i in range(length): idx = start + i if i >= 1: temp_result[i, 0] = max(segment[i, H] - segment[i, L], abs(segment[i, H] - segment[i-1, C]), abs(segment[i, L] - segment[i-1, C])) if (segment[i, H] - segment[i-1, H] > 0 and segment[i, H] - segment[i-1, H] > segment[i-1, L] - segment[i, L]): temp_result[i, 6] = segment[i, H] - segment[i-1, H] if (segment[i-1, L] - segment[i, L] > 0 and segment[i-1, L] - segment[i, L] > segment[i, H] - segment[i-1, H]): temp_result[i, 7] = segment[i-1, L] - segment[i, L] if i < 5: if i == 0: temp_result[i, 4 + period_idx] = segment[i, C] else: temp_result[i, 4 + period_idx] = round(temp_result[i-1, 4 + period_idx] + (segment[i, C] - temp_result[i-1, 4 + period_idx]) * 2 / (period + 1), 3) else: temp_result[i, 4 + period_idx] = round(temp_result[i-1, 4 + period_idx] + (segment[i, C] - temp_result[i-1, 4 + period_idx]) * 2 / (period + 1), 3) # EMA计算(庄1和庄2) if i <= 4: if i == 0: result[start, ema_chuang1] = segment[start, C] # p=12 result[start, ema_chuang2] = segment[start, C] # p=56 else: prev_idx = start + i - 1 result[idx, ema_chuang1] = round(result[prev_idx, ema_chuang1] + (segment[i, C] - result[prev_idx, ema_chuang1]) * 2 / (12 + 1), 3) result[idx, ema_chuang2] = round(result[prev_idx, ema_chuang2] + (segment[i, C] - result[prev_idx, ema_chuang2]) * 2 / (56 + 1), 3) elif i > 4: prev_idx = start + i - 1 result[idx, ema_chuang1] = round(result[prev_idx, ema_chuang1] + (segment[i, C] - result[prev_idx, ema_chuang1]) * 2 / (12 + 1), 3) result[idx, ema_chuang2] = round(result[prev_idx, ema_chuang2] + (segment[i, C] - result[prev_idx, ema_chuang2]) * 2 / (56 + 1), 3) if period_idx == 1: result[idx, indicator_dict["庄1"]] = round((result[idx, ema_chuang1] - result[idx, ema_chuang2]) * 10, 2) if i >= 4: result[idx, indicator_dict["庄2"]] = round(np.mean(result[idx-4:idx+1, indicator_dict["庄1"]]), 2) if i > 0: if (result[idx-1, indicator_dict["庄1"]] < result[idx-1, indicator_dict["庄2"]] and result[idx, indicator_dict["庄1"]] >= result[idx, indicator_dict["庄2"]]): result[idx, indicator_dict["叉庄"]] = 1 elif result[idx, indicator_dict["庄1"]] >= result[idx, indicator_dict["庄2"]]: result[idx, indicator_dict["叉庄"]] = result[idx-1, indicator_dict["叉庄"]] + 1 elif (result[idx-1, indicator_dict["庄1"]] >= result[idx-1, indicator_dict["庄2"]] and result[idx, indicator_dict["庄1"]] < result[idx, indicator_dict["庄2"]]): result[idx, indicator_dict["叉庄"]] = -1 elif result[idx, indicator_dict["庄1"]] < result[idx, indicator_dict["庄2"]]: result[idx, indicator_dict["叉庄"]] = result[idx-1, indicator_dict["叉庄"]] - 1 if i >= 23: start_idx = max(0, i - 23) result[idx, indicator_dict["二"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 59: start_idx = max(0, i - 59) result[idx, indicator_dict["六"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 134: start_idx = max(0, i - 134) result[idx, indicator_dict["候"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 299: start_idx = max(0, i - 299) result[idx, indicator_dict["叁"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 479: start_idx = max(0, i - 479) result[idx, indicator_dict["八"]] = round(np.mean(segment[start_idx:i+1, C]), 3) if i >= 35 and period_idx == 1: max_high = np.max(segment[i-35:i+1, H]) min_low = np.min(segment[i-35:i+1, L]) if max_high == min_low: result[idx, indicator_dict["悟"]] = 100 else: result[idx, indicator_dict["悟"]] = round((segment[i, C] - min_low) / (max_high - min_low + 0.00000001) * 100, 2) if idx > 0 and not np.isnan(result[idx-1, k_intermediate]) and result[idx, indicator_dict["悟"]] < 200: result[idx, k_intermediate] = round((result[idx, indicator_dict["悟"]] / 3 + 2 * result[idx-1, k_intermediate] / 3), 2) else: result[idx, k_intermediate] = result[idx, indicator_dict["悟"]] result[idx, indicator_dict["悟"]] = (result[idx, k_intermediate] - 50) * 1.5 if i >= period: temp_result[i, 1] = round(np.sum(temp_result[i-period+1:i+1, 0]), 3) temp_result[i, 8] = round(np.sum(temp_result[i-period+1:i+1, 6]) / (temp_result[i, 1] + 0.0000001) * 100, 3) temp_result[i, 9] = round(np.sum(temp_result[i-period+1:i+1, 7]) / (temp_result[i, 1] + 0.0000001) * 100, 3) temp_result[i, 10] = abs(temp_result[i, 8] - temp_result[i, 9]) / (temp_result[i, 8] + temp_result[i, 9] + 0.00000001) * 100 temp_result[i, 11] = (np.mean(temp_result[i-smoothing+1:i+1, 10]) if temp_result[i, 8] > temp_result[i, 9] else -np.mean(temp_result[i-smoothing+1:i+1, 10])) if i >= 2: factor = 1 if period_idx == 0 else 1.5 result[idx, indicator_dict["知"] if period_idx == 0 else indicator_dict["物"]] = round(np.mean(temp_result[i-2:i+1, 11]) * factor, 2)

逐行检查所有日期,筛选满足条件的股票和日期

selected_data = [] # 存储满足条件的行数据(编码和日期)
for stock in range(stocks):
if stock == 0:
start, end = 0, int(stock_ranges[stock, 2]) - 1
else:
start, end = int(stock_ranges[stock-1, 2]), int(stock_ranges[stock, 2]) - 1

text
# 获取当前股票的数据 stock_data = df.iloc[start:end + 1] stock_result = result[start:end + 1, :] # 仅处理有足够数据的情况(至少134根K线以计算M120) if len(stock_data) < 134: # M120需要至少134根K线 continue # 逐行检查所有行 for i in range(len(stock_data)): if i < 50: # 确保有足够的历史数据(至少50根K线) continue # 获取当前行数据 current_row = stock_result[i] current_price = stock_data['收'].iloc[i] m120 = current_row[indicator_dict["候"]] # M120 # 获取过去50根K线和过去10根K线的知、物和候数据 history_50 = stock_result[max(0, i-49):i+1, :] # 包含当前行在内的50根K线 history_10 = stock_result[max(0, i-9):i+1, :] # 包含当前行在内的10根K线 # JSL选股条件 # 1. 过去50根K线中,M120(候)向上突破前一根K线超过45次 m120_up_cross = np.sum(history_50[1:, indicator_dict["候"]] > history_50[:-1, indicator_dict["候"]]) > 45 # 2. 收盘价与M120的相对偏差小于20%(放宽阈值) price_close_to_m120 = abs(current_price / m120 - 1) * 100 < 20 # 3. 过去50根K线中,知(常规周期ADX)小于-50至少1次 adx_know_below_50 = np.sum(history_50[:, indicator_dict["知"]] < -50) > 0 # 4. 过去10根K线中,知向上交叉物至少1次,或当前知 > 物 adx_cross_up = (current_row[indicator_dict["知"]] > current_row[indicator_dict["物"]]) or \ np.sum((history_10[:-1, indicator_dict["知"]] < history_10[:-1, indicator_dict["物"]]) & (history_10[1:, indicator_dict["知"]] >= history_10[1:, indicator_dict["物"]])) > 0 # 5. 当前知(常规周期ADX)大于0 adx_know_positive = current_row[indicator_dict["知"]] > 0 # 如果满足所有条件,记录当前行数据 if m120_up_cross and price_close_to_m120 and adx_know_below_50 and adx_cross_up and adx_know_positive: selected_data.append({ '编码': stock_data['编码'].iloc[i], '日期': stock_data['日期'].iloc[i] })

创建新的JMP表格,存储符合条件的日期和编码

if selected_data:
# 转换为DataFrame
result_df = pd.DataFrame(selected_data)

text
# 创建新的JMP表格 output_dt = jmp.DataTable("选股结果", result_df.shape[0]) for col in result_df.columns: col_type = jmp.DataType.Numeric # 假设编码和日期为数值类型 output_dt.new_column(col, col_type) output_dt[col] = list(result_df[col]) print("已成功创建JMP表格 '选股结果',包含符合条件的日期和编码。")

else:
print("没有找到符合条件的日期和编码。")

输出选中的股票编码和日期(用于验证)

print("选中的股票编码和日期:")
for item in selected_data:
print(f"日期: {item['日期']}")

print(f"代码运行耗时: {time.time() - start_time} 秒")

الفكرة

Evaluating performance

I’m working through Python code that reads data from a JMP table, converts columns to a Pandas DataFrame, and computes indicators in a loop.

Enhancing performance

I'm thinking about chunk reading or direct conversions from JMP to DataFrame. Vectorizing repeated code in the loop, especially for EMA calculations, seems like a potential improvement.

Enhancing with rolling calculations

I'm exploring how pandas' rolling functions can simplify repeated calculations across data segments, like rolling sum for True Range and rolling max/min for indicators, to streamline and accelerate the process.

Enhancing accuracy

Applying vectorization to compute the True Range, EMA, and differences. This approach streamlines calculations and ensures precision, making the process more efficient and accurate.

Refining calculations

I’m finalizing vectorized computations for various indicators, ensuring precise handling of indexing and grouping by '编码'. This approach enhances efficiency and accuracy throughout the process.

Integrating smoothing logic

I’m working on integrating a custom smoothing method for the '悟' indicator, using exponential smoothing with a cap of 200 to ensure precision and enhance the data's quality.

Enhancing custom calculations

I’m focusing on refining the cumulative method for '悟', combining partial vectorization for simpler tasks with loops for intricate '悟' computations to ensure precision and streamline processing.

Simplifying calculations

I’m working through streamlining the code by merging double loops into a single one and incorporating rolling sums for TR and other calculations, aiming for enhanced efficiency and accuracy.

Enhancing computational flow

I’m contemplating strategies to streamline complex calculations, including removing double loops and leveraging chunk logic, to improve efficiency and processing speed.

Enhancing efficiency

I’m working on a more efficient, mostly vectorized approach, focusing on minimizing loops and ensuring conditions are met for data selection, which will boost performance and streamline the process.

Refining data handling

I’m streamlining data processing by reading data efficiently, calculating indicators for each group, and using rolling windows for partial vectorization to enhance performance while maintaining code clarity.

Crafting a solution

I’m working on a concise code snippet to enhance vectorization, maintaining custom logic adaptations. This aims to provide efficiency with a clear, adaptable approach for the user.

Reasoned for 28 seconds

الإجابة

下面给出一段思路示例,展示如何尽可能将原本在循环里逐行计算的指标,改为使用 Pandas 的分组(groupby)和向量化(rolling/ewm 等)来加速。请注意,由于你的原始逻辑中有一些自定义的分段计算、条件判断与循环依赖(尤其是“悟”的平滑逻辑,以及对前值的依赖判断),如果要完全和原代码一字不差地复刻结果,仍需做适当的循环或自定义函数。下面的示例代码给出的主要是核心的提速思路,供你参考、调整与改进。


1. 核心思路:减少显式循环,更多使用 Pandas 的向量化

  • True Range、ADX 之类的指标:可通过 shift()rolling(window).sum()/rolling(window).mean() 等一系列方法来快速完成。
  • 多周期 EMA:可用 df.groupby('编码')['收'].ewm(span=周期, adjust=False).mean()
  • 滚动的最大值、最小值:可用 rolling(window).max()rolling(window).min()
  • 自定义的循环依赖:例如 “悟” 的平滑逻辑,需要自己在 group 内做一次递归循环,或写一个自定义的 apply()

如果某些部分的计算需要前一行计算结果来继续往下推(如 “悟” 指标中途判断 < 200 时才做特殊平滑处理),那么这部分逻辑无法单纯用 rolling()ewm() 等“无状态”的向量化方法,只能用带状态的循环或自定义累计函数(cumulative approach)。但也可以把其余能用向量化的方法先加速掉,只在必要环节做循环,大大缩短整体运行时间。


2. 代码示例

下面给出一个简化示例,它省略或简化了你原代码中的一些细节,主要演示分组 + 向量化的用法。你可以在此基础上,结合自己的实际需求(如“悟”的特殊计算、“知”“物”对两段周期的分别处理等),进一步修改。

python
import jmp import numpy as np import pandas as pd import time start_time = time.time() # 读取JMP表格 -> 转成 DataFrame dt = jmp.open(r"C:\0\原始0.jmp") data_dict = {} for idx in range(len(dt)): col_name = dt[idx].name data_dict[col_name] = np.array(dt[idx]) df = pd.DataFrame(data_dict) # 假设 df 中已经包含以下列名:['编码', '日期', '开', '高', '低', '收', '量'] # 确保 df 按日期升序排序(如果还没排序过的话) df = df.sort_values(by=['编码', '日期'], ascending=True) # ------------------------------------------------------------------- # 1) 一些基础列的预先计算:TR、以及高低点差等 # 这里用向量化 + shift() 来实现,每只股票分组后再计算 # ------------------------------------------------------------------- def calc_base_indicators(group): # group 是单只股票的数据子集,已经按日期排好序了 group['H_L'] = group['高'] - group['低'] group['H_pC'] = (group['高'] - group['收'].shift(1)).abs() # 高-前收 group['L_pC'] = (group['低'] - group['收'].shift(1)).abs() # 低-前收 # True Range group['TR'] = group[['H_L','H_pC','L_pC']].max(axis=1) # 计算用于 ADX 的 “上动量”、“下动量” # 注意:原代码里有对比 (H - prev_H) 和 (prev_L - L) 谁大谁小等逻辑 # 这里只做一个简单示例,需要的话可继续加 if/where 判断 group['DM_plus'] = np.where( (group['高'].diff() > 0) & (group['高'].diff() > group['低'].shift(1) - group['低']), group['高'].diff(), 0 ) group['DM_minus'] = np.where( (group['低'].shift(1) - group['低'] > 0) & (group['低'].shift(1) - group['低'] > group['高'].diff()), group['低'].shift(1) - group['低'], 0 ) return group df = df.groupby('编码', group_keys=False).apply(calc_base_indicators) # ------------------------------------------------------------------- # 2) 多周期 EMA 的计算 (庄1/庄2 之类) # 直接用 ewm(),注意要在 groupby 内部调用 # ------------------------------------------------------------------- def calc_ema_indicators(group): group = group.copy() # 避免修改原数据 group['EMA12'] = group['收'].ewm(span=12, adjust=False).mean() group['EMA56'] = group['收'].ewm(span=56, adjust=False).mean() # 庄1、庄2的示例:庄1 = (EMA12 - EMA56)*10,庄2 = 庄1 的 5 日均 group['庄1'] = (group['EMA12'] - group['EMA56'])*10 group['庄2'] = group['庄1'].rolling(window=5, min_periods=1).mean() # 也可加 center=False # 叉庄:根据庄1和庄2的交叉信息来递增递减 # 如果想精确复刻“上一条 +1/-1 累加”的写法,就要做一个循环或自定义累积函数: cross_vals = [0]*len(group) # 用来保存叉庄列 for i in range(1, len(group)): prev_庄1 = group['庄1'].iloc[i-1] prev_庄2 = group['庄2'].iloc[i-1] curr_庄1 = group['庄1'].iloc[i] curr_庄2 = group['庄2'].iloc[i] prev_cross = cross_vals[i-1] # 和你原代码的判断逻辑相同 if (prev_庄1 < prev_庄2 and curr_庄1 >= curr_庄2): cross_vals[i] = 1 elif curr_庄1 >= curr_庄2: cross_vals[i] = prev_cross + 1 elif (prev_庄1 >= prev_庄2 and curr_庄1 < curr_庄2): cross_vals[i] = -1 else: cross_vals[i] = prev_cross - 1 group['叉庄'] = cross_vals return group df = df.groupby('编码', group_keys=False).apply(calc_ema_indicators) # ------------------------------------------------------------------- # 3) 计算各种长周期均线 (二/六/候/叁/八...) # 注意 rolling(N) 里的 N 要比原代码+1,因为 Python rolling 是“包含当前行” # 原代码里比如 "if i >= 23" -> 用最近24根K线计算,所以 rolling(24) 即可。 # ------------------------------------------------------------------- def calc_ma_indicators(group): group = group.copy() group['二'] = group['收'].rolling(window=24, min_periods=1).mean() group['六'] = group['收'].rolling(window=60, min_periods=1).mean() group['候'] = group['收'].rolling(window=135, min_periods=1).mean() group['叁'] = group['收'].rolling(window=300, min_periods=1).mean() group['八'] = group['收'].rolling(window=480, min_periods=1).mean() return group df = df.groupby('编码', group_keys=False).apply(calc_ma_indicators) # ------------------------------------------------------------------- # 4) “悟” 指标:需要滚动 36 根的最高/最低;还带一个自定义平滑 # (max(36), min(36)) 很好做,但后面那句: # if (悟 < 200) 则 悟 = (悟/3 + 2*前悟/3) 的逻辑,只能再来一个循环或者自定义函数。 # ------------------------------------------------------------------- def calc_wu_indicator(group): group = group.copy() # 先计算未平滑版本 rolling_window = 36 group['max_36'] = group['高'].rolling(window=rolling_window, min_periods=1).max() group['min_36'] = group['低'].rolling(window=rolling_window, min_periods=1).min() # raw_wu = (当前收 - min_36) / (max_36 - min_36) * 100 # 防止除 0 eps = 1e-8 group['悟_raw'] = ((group['收'] - group['min_36']) / (group['max_36'] - group['min_36'] + eps) * 100) # 然后做你原代码那种“<200时平滑,否则直接赋值”的逻辑 # 再最后做 (悟 - 50)*1.5 Wu_values = [0]*len(group) for i in range(len(group)): if i == 0: # 第一条直接等于 raw Wu_values[i] = group['悟_raw'].iloc[i] else: prev_val = Wu_values[i-1] curr_raw = group['悟_raw'].iloc[i] if curr_raw < 200: Wu_values[i] = (curr_raw/3 + 2*prev_val/3) else: Wu_values[i] = curr_raw # 最终“悟”列 group['悟'] = [(val - 50)*1.5 for val in Wu_values] return group df = df.groupby('编码', group_keys=False).apply(calc_wu_indicator) # ------------------------------------------------------------------- # 5) ADX 的计算:原代码里对 12/56 两种周期都做了一些算术。 # 可以直接写成函数 + groupby.apply(),用 rolling().sum()/mean() 来代替循环 # ------------------------------------------------------------------- def calc_adx_indicators(group): group = group.copy() # 演示:对 12 周期做一次 ADX,对 56 周期做一次 ADX # 如果要复刻“知”“物”分别对应 12/56,可再分列。 # 这里只是示意:TR_N = TR过去N日累和;DM_plus_N = ...; DM_minus_N = ... # ADX = ... def adx_calc(series_tr, series_dm_plus, series_dm_minus, period): # rolling sum TR_n = series_tr.rolling(window=period).sum() DM_plus_n = series_dm_plus.rolling(window=period).sum() DM_minus_n = series_dm_minus.rolling(window=period).sum() # +DI = DM_plus_n / TR_n * 100 # -DI = DM_minus_n / TR_n * 100 +DI = DM_plus_n / (TR_n + 1e-8) * 100 -DI = DM_minus_n / (TR_n + 1e-8) * 100 # DX = abs(+DI - -DI) / (+DI + -DI) * 100 DX = ( (+DI - -DI).abs() / ((+DI + -DI) + 1e-8) ) * 100 # 再对 DX 做个“6 日均”之类的平滑 => 这里随便写一下 ADX = DX.rolling(window=6).mean() return ADX group['ADX12'] = adx_calc(group['TR'], group['DM_plus'], group['DM_minus'], 12) group['ADX56'] = adx_calc(group['TR'], group['DM_plus'], group['DM_minus'], 56) # 比如在原代码里,“知” = ADX12 的某种处理,“物”= ADX56 的某种处理 group['知'] = group['ADX12'] # 简化 group['物'] = group['ADX56'] return group df = df.groupby('编码', group_keys=False).apply(calc_adx_indicators) # ------------------------------------------------------------------- # 6) 统一筛选选股条件(JSL 选股) # 由于指令大多是“过去 N 根 K线满足...”,可以直接用 rolling # 或者 slice 来判断。这里给一个简化示例。 # ------------------------------------------------------------------- def select_stocks(df): selected = [] # 逐行判断 -> 可以向量化,但如果逻辑不多,直接 for 也没关系 # 这里演示一下如何向量化判断:例子仅供参考 # (相对原代码,这里做了简化;你可以把每一条条件转成布尔序列,再合并) # 1) 过去50根中,M120(候) 向上突破超过45次 # -> 先做一个布尔列,表示候(t) > 候(t-1) df['候_up'] = df['候'] > df['候'].shift(1) # -> 再对这个布尔列做 rolling(50).sum(),看是否>45 df['cond1'] = df.groupby('编码')['候_up'].rolling(window=50, min_periods=50).sum().values > 45 # 2) 收盘价与 M120 相对偏差 < 20% df['cond2'] = ((df['收'] / df['候'] - 1).abs() < 0.2) # 3) 过去50根 K线中,有至少1次“知 < -50” # -> 转化成“知 < -50”的布尔列,然后 rolling(50).max() # 如果 max>0,说明这段区间内有 True df['知_lt_m50'] = (df['知'] < -50) rolling_has_below = df.groupby('编码')['知_lt_m50'].rolling(window=50, min_periods=50).max().values df['cond3'] = (rolling_has_below == 1) # 4) 过去10根 K线中,知 向上交叉 物,或当前知>物 # 这里演示简化版本:判断当前知>物 或 “知从下穿到上” df['cond4'] = False mask = (df['知'] > df['物']) df['cond4'] = mask # 当前行大于 # 要判断 10 根里有交叉,可自行再写个 rolling.any() / rolling.sum() 逻辑 # 5) 当前知 > 0 df['cond5'] = (df['知'] > 0) # 最终同时满足 df['final_cond'] = df['cond1'] & df['cond2'] & df['cond3'] & df['cond4'] & df['cond5'] # 取出满足条件的 pick = df[df['final_cond']] if len(pick) > 0: selected = pick[['编码', '日期']] return selected selected_df = select_stocks(df) # 如果有结果,写回 JMP if len(selected_df) > 0: output_dt = jmp.DataTable("选股结果", selected_df.shape[0]) for col in selected_df.columns: # 如果“日期”是字符串或其他类型,可以改成 jmp.DataType.Character output_dt.new_column(col, jmp.DataType.Numeric) output_dt[col] = list(selected_df[col]) print(f"已成功创建 JMP 表格 '选股结果',共 {len(selected_df)} 行。") else: print("没有符合选股条件的数据。") print("选中的股票编码和日期:") print(selected_df) print(f"代码运行耗时: {time.time() - start_time:.2f} 秒")

重点说明

  1. 分组计算

    • 大部分指标都是“针对每只股票单独计算”,所以要先 df.groupby('编码')
    • 在每个 group 里面,你可以借助 rolling()ewm() 等向量化方法。
    • 一旦需要使用前一条计算值(如“叉庄”的累计计数或“悟”的自定义平滑),则需要在 group 内部做一个循环或自定义函数;没法单纯用 rolling 解决。
  2. 滚动窗口大小与原代码的 off-by-one 问题

    • 原代码里常写 if i >= 23: ...,本质是用最近 24 根 K 线。
    • Pandas 里 rolling(24) 就是包含当前行在内的 24 条数据,如果不想包含当前行,可再指定 shift(1) 或在 rolling() 后再做一些移位处理。
    • 一定要校对好窗口大小,和原公式是否一致。
  3. 向量化提速

    • 一旦去掉那些最耗时的三重/双重 for 循环,并把绝大部分计算改写成单次 groupby.apply(...) + rolling/ewm,对大数据量会有显著提速。
    • 仅保留必要的自定义循环(“悟”或“叉庄”)在每个 group 做一次顺序遍历,远比在“所有股票 + 所有列 + period_idx”三层循环中再加 i 循环要快得多。
  4. 选择条件

    • 选股条件也可以直接做成布尔运算,再拼接在一起,而不必显式地在最外层再跑一大圈 for i in range(...)
  5. 数值精度

    • 原代码里大量使用 round(..., 3)round(..., 2),在实际应用中,可以尽量少用或只在最终输出前再做一次保留小数,避免无谓的多次转换。
    • 过多的 round() 调用,会增加一些开销,也会带来更多浮点误差累积。

总之,如果你的核心目的是“提速”,那就:

  • 能向量化的部分(大多数常见技术指标)都用 Pandas rolling() / ewm() 等函数搞定。
  • 只在必须依赖“前一行计算结果”才能继续计算的环节,用一个简短的 for 循环(每只股票一次)。
  • 最终在选股条件处,也优先使用布尔向量运算而不是再写多重循环。

这样通常能极大地减少 Python 解释器层面的循环,性能会有显著提升。你也可以视情况改用 NumPy 做滚动窗口(速度更快,但写法相对繁琐)或者直接用某些专门的指标库(如 talibpyti 等),进一步加快处理速度。祝你一切顺利!

مشاركة هذا السؤال والجواب