gpt-5与gpt-o3是什么区别呢?哪种更优秀
創建於:2025年8月18日
創建於:2025年8月18日
gpt-5与gpt-o3是什么区别呢?哪种更优秀
简短结论:
**o3 属于上一代“推理”模型,擅长长链推理与多模态理解;GPT-5 是新一代统一系统(含 GPT-5 Thinking),总体更强,默认更快更实用,并在需要时自动“深度思考”。**在大多数任务里,优先选 GPT-5;只有当你固定依赖旧的 o3 工作流或权限限制时,才考虑继续用 o3。(OpenAI, OpenAI CDN)
维度 | GPT-5 | OpenAI o3 |
---|---|---|
模型定位 | 统一系统:快速主模型 + 深度推理模型(GPT-5 Thinking)+ 实时路由根据任务自动选择 | 专注“长时思考”的推理模型 |
强项 | 编码与多工具编排、指令跟随、通用问答;在需要时启用深度推理;更低幻觉率与更诚实 | 数学/科学/编码/视觉推理强,能把图表、截图等纳入推理 |
使用体验 | 默认更快更实用;可在 API 里控制“最小推理(minimal reasoning)”与冗长度(verbosity) | 以“推理努力”换效果,通常时延更高 |
状态 | 新一代旗舰,已在 ChatGPT 默认替代 o3;亦提供 GPT-5 Thinking / Pro | 上代推理线,官方文档标注已被 GPT-5 继任 |
(OpenAI CDN) (OpenAI) (OpenAI) (OpenAI) (OpenAI Platform) |
需要的话,告诉我你的具体场景(写作、编码、做研究、做报表等),我直接给你“选型+提示词”搭配。
'''from shutil import copy
import re
import sys
import argparse
from heat_rect import *
from load_data import Elements
from reprocess import DataFile
from reprocess import HtmlReport
from excel_loader import data_template
def conv_path(path):
"""
检测当前路径中是否包含路径分隔符,若不包含则是单个文件名。
"""
return path.rfind(os.path.sep) != -1
def to_list(elems):
"""
用于将单个元素(包括 None)统一转换为列表形式。
"""
if elems is None:
return []
elif type(elems) is str:
return [elems]
else:
return elems
def parse_args(proj_name, title, template=True):
"""
解析命令行参数,返回一个包含参数的字典。
"""
parser = argparse.ArgumentParser(description='数据协调计算程序')
parser.add_argument('--proj', type=str, required=False, default=proj_name, help='项目名称')
parser.add_argument('--input', type=str, required=False, default=f'{proj_name}_x.csv', help='输入数据名称')
parser.add_argument('--desc', type=str, required=False, default=title, help='项目描述')
parser.add_argument('--output', type=str, required=False, default=None, help='结果输出路径')
parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径')
args = parser.parse_args()
text# 转换数据文件模式 if template is True and args.trans is not None: data_template(args.trans, template=f'{proj_name}_data_template.csv') return args
class Solver:
"""
求解器类,用于组织各个变量对象,并以此为基础进行变量构建、协调、粗大误差识别、修正、比较等功能。
"""
def init(self, proj_name=None, input_name=None, desc=None, timer=None, link=True, output=None):
text# 设置默认大气压 self.atmos = 0.101325 # 附属变量对象 self.x = None self.f = None self.u = None # 附属函数对象 self.bal_func = None # 热平衡函数: x -> u self.dvr_func = None # 数据协调函数: x, u -> f self.drop_func = None # 关闭冗余平衡方程函数:x, u -> List if proj_name is None: proj_name = 'default' self.proj_name = proj_name # 输入与输出路径处理 # 输入路径预处理 if input_name is not None: # 自定义输入文件 self.input_name = input_name.replace('/', os.path.sep) # 统一路径分隔符 if not conv_path(self.input_name): # 路径中不包含分隔符,单文件名 if not os.path.exists(self.input_name): # 当前目录下寻找 if os.path.exists('data'): # 否则在 data 目录下寻找 self.input_name = os.path.join('data', self.input_name) # 检查文件是否存在 if not os.path.exists(self.input_name): raise FileNotFoundError('未找到 input 参数所对应的数据文件,请检查后重试!') else: raise ValueError('未提供 input 参数,请指定数据文件后再进行计算!') # 输出路径生成 # 输出路径或为当前工作目录,或来自输入文件,不需要检查存在性 result_dir = 'result' self.output = output if self.output is None: # 未提供输出路径时 if self.input_name is not None: # 若提供了输入文件路径,则设置为对应目录 self.output = os.path.abspath(os.path.join(self.input_name, os.pardir)) input_folder = re.match(r'(.*\\)?(.*)\.csv', self.input_name) result_dir += '_' + input_folder.group(2) self.output = os.path.join(self.output, result_dir) else: # 否则直接在当前工作路径下创建输出文件夹 self.output = result_dir if not os.path.exists(self.output): os.mkdir(self.output) # 其他变量生成 self.report = None if desc is None: desc = '数据协调计算报告' self.desc = desc self.counter = 0 # 共执行计算的次数 self.data_file = None # 数据文件 self.link_mode = link # 是否启用链式计算模式 # 计时器 if timer is None: self.timer = datetime.now() else: self.timer = timer # 日志配置 root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') file_handler = logging.FileHandler(self.log_path, mode='w', encoding='utf-8') file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) std_handler = logging.StreamHandler() std_handler.setFormatter(formatter) root_logger.addHandler(std_handler) logging.info('欢迎使用 heat_rect 数据协调计算程序!项目编号:{},项目标题:{}'.format( self.proj_name, self.desc)) # 创建基本对象 logging.info('开始读取变量数据……') self.create_var() logging.info('开始创建及维护计算文件……') self.create_file() self.data_file.cache('atmos', self.atmos) # 记录大气压 logging.info('变量及文件路径初始化完成。(elapsed time: {:d}s)'.format(self.time)) logging.info('当前程序输出路径为:[{}]'.format(os.path.abspath(self.output))) @property def n2x(self): return self.x.n2i @property def n2u(self): return self.u.n2i @property def n2f(self): return self.f.n2i @property def time(self): return elapsed_time(self.timer) @property def data_path(self): if os.path.isabs(self.data_file.path): # 绝对路径直接返回 return self.data_file.path else: return os.path.abspath(os.path.join(self.output, self.data_file.path)) @property def report_path(self): if os.path.isabs(self.report.path): # 绝对路径直接返回 return self.report.path else: return os.path.abspath(os.path.join(self.output, self.report.path)) @property def log_path(self): return os.path.abspath(os.path.join(self.output, self.proj_name + '.log')) @property def root_path(self): # 定位程序根目录 if getattr(sys, 'frozen', False): # 打包后程序 return sys._MEIPASS else: # 脚本状态 return os.path.join(os.path.realpath(__file__), os.path.pardir) def set_atmos(self, atmos): self.atmos = atmos def create_file(self): """ 创建数据文件与报告文件。在需要进行松弛计算时,可多次调用。 """ filename = self.proj_name if self.counter > 0: # 多次执行 filename += '-' + str(self.counter) self.data_file = DataFile(filename + '.json', root=self.output) # 创建结果数据文件 self.report = HtmlReport(filename, root=self.output) # 创建报告 # 从程序目录复制静态文件 static_path = os.path.abspath(os.path.join(self.root_path, 'bundle')) for static_file in os.listdir(static_path): if static_file.endswith('.js') or static_file.endswith('.css'): if not os.path.exists(os.path.join(self.output, static_file)): # 仅在未复制时创建 copy(os.path.join(static_path, static_file), self.output) self.report.create(title=self.desc, author='中核运行研究院') def create_var(self): """ 创建变量对象。该函数仅在创建求解器时调用一次即可。 """ # 初始化设备描述文件 try: elems = Elements(self.proj_name) except: logging.warning('未检测到设备描述文件,不再进行设备描述生成。') elems = None # 初始化已知测点 md_x = MeasureData(self.input_name) if elems is not None: md_x.load_elems(elems) md_x.load() self.x = Measured(md_x) self.x.init_md() self.set_atmos(self.x.var.atmos) # 更新大气压 logging.info('测点变量定义读取完成。(elapsed time: {:d}s)'.format(self.time)) # 初始化未知测点 try: md_u = UnmeasuredData(os.path.join(self.root_path, 'bundle', self.proj_name + '_u.csv')) if elems is not None: md_u.load_elems(elems) md_u.load() except: md_u = UnmeasuredData(self.proj_name + '_u.csv') md_u.load() self.u = Unknown(md_u) self.u.init_md() # 初始化冗余平衡方程 try: md_f = Data(os.path.join(self.root_path, 'bundle', self.proj_name + '_f.csv')) md_f.load() except: md_f = Data(self.proj_name + '_f.csv') md_f.load() self.f = Equation(md_f) self.f.init_md() def create_func(self, bal_func, dvr_func, drop_func=None): """ 创建热平衡与数据协调函数变量。该函数仅在创建求解器后调用一次即可。 :param bal_func: 热平衡函数变量。 :param dvr_func: 数据协调函数变量。 :param drop_func: 移除方程函数。 """ if bal_func is None: assert self.link_mode is False # 必须是非链接模式 self.bal_func = None else: self.bal_func = bal_func self.dvr_func = dvr_func if drop_func is not None: self.drop_func = drop_func def drop_equations(self, names): """ 从平衡方程中排除指定方程。 :param names: 方程索引列表。 """ assert self.f is None # TODO: 利用 Partialer 类完成本函数编写 def init(self): """ 初始化各变量值。该函数仅在计算最开始调用一次。 """ # 读取未知测点 if self.link_mode: self.u.construct(self.bal_func) elif self.bal_func is not None: self.u.construct(self.bal_func) logging.info('未知测点变量读取完成。(elapsed time: {:d}s)'.format(self.time)) # 读取平衡方程 self.f.construct(self.dvr_func) if self.drop_func is not None: self.f.bind_drop(self.drop_func) self.f.patch_redundancy(self.x) logging.info('冗余平衡方程读取完成。(elapsed time: {:d}s)'.format(self.time)) def solve(self): """ 执行求解过程。该函数可在松弛计算过程中多次调用。 """ # self.x.show() self.data_file.cache_var('x', self.x, md=True) logging.info('开始求解数据协调优化问题……(elapsed time: {:d}s)'.format(self.time)) if self.link_mode or self.bal_func is not None: self.u.solve(self.x) else: self.u.init_static() self.data_file.cache_var('u', self.u, simple=True, md=True) self.data_file.cache({'u': {'vunit': self.u.var.data['vunit']}}) # 未知变量单位补丁 logging.info('已通过热平衡计算获得未知测点初始假设值。(elapsed time: {:d}s)'.format(self.time)) self.f.mapping(self.x, self.u, link_mode=self.link_mode) self.data_file.cache_var('f', self.f, simple=True, md=True) logging.info('冗余平衡方程构建完成。(elapsed time: {:d}s)'.format(self.time)) init_err = mse(self.f.val) logging.info('当前测点下冗余平衡方程残差均方差为:{:.2E}'.format(init_err)) self.data_file.cache({'init_err': init_err}) self.x.evaluate(self.f, u=self.u, link_mode=self.link_mode) # 求解协调优化问题 def verify(self): """ 执行粗大误差验证过程,若不存在则将自动执行修正计算,否则进行粗大误差定位。可以多次调用。 :return: 当前数据中是否存在粗大误差,布尔值。 """ quality = self.f.verify(self.x, u=None if self.link_mode else self.u, chi2=True) flag = quality < 1 # 是否存在粗大误差 if flag: logging.info('当前数据卡方值为 {:.3f},小于临界值 {:.3f},品质因数为 {:.3f}, 系统整体不存在粗大误差。'.format( self.x.total_penalty, self.x.K_crt, quality)) else: logging.warning('当前数据卡方值为 {:.3f},大于临界值 {:.3f},品质因数为 {:.3f},系统整体存在粗大误差!'.format( self.x.total_penalty, self.x.K_crt, quality)) # 误差定位 logging.info('开始进行误差定位!(elapsed time: {:d}s)'.format(self.time)) # 约束方程检验法 index_f = self.f.locate() self.data_file.cache({'f': {'r': self.f.r}}) self.data_file.cache(index_f, 'f_index') # 测量数据检验法 index = self.x.locate() self.data_file.cache({'x': {'delta': self.x.v, 'r': self.x.r}}) self.data_file.cache(index, 'index') # 计算修正结果 logging.info('开始实施数据协调校正!(elapsed time: {:d}s)'.format(self.time)) self.x.rectify() # 修正已知测点变量值 if self.link_mode: self.u.solve(self.x) # 计算未知测点变量值 else: self.u.calc_static() self.x.patch_unknown(self.u) self.f.mapping(self.x, self.u, link_mode=self.link_mode) # 计算平衡方程剩余残差 self.data_file.cache_new(self.x, self.f, u=self.u, link_mode=self.link_mode) final_err = mse(self.f.residual()) self.data_file.cache({'final_err': final_err}) # 保存数据 self.data_file.cache(self.x.total_penalty, 'total_penalty') # 总卡方值 self.data_file.cache({'x': {'penalty': self.x.penalty}}) # 各变量卡方值 self.data_file.cache({ 'eq_err': self.x.eq_err, 'chis': self.x.chis }) # 方程均方差与总卡方值曲线 self.data_file.cache(self.x.K_crt, 'Kcrt') # 假设检验临界值 return flag def save_file(self): """ 保存数据文件与计算报告。在松弛计算时,可多次调用。 """ # 保存数据文件 logging.info('当前计算步 JSON 数据文件保存至: [{}]'.format(self.data_path)) self.data_file.convert_unit() self.data_file.save() self.data_file.save_csv() # self.data_file.save_balanced_csv() # 仅在为论文调试系统时使用 # 保存网页报告 logging.info('当前计算报告保存至: [{}]'.format(self.report_path)) self.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse() def routine(self): """ 常规求解循环,执行求解和验证环节。 """ self.init() self.solve() self.verify() logging.info('数据协调计算完成。(elapsed time: {:d}s)'.format(self.time)) logging.info('本次计算过程日志保存至: [{}]'.format(self.log_path)) self.save_file()
class VariableManager:
"""
变量管理器,用于统一组织已知及未知变量,简化热平衡与协调方程写法。
"""
def init(self, n2x, n2u):
self.n2x = n2x
self.n2u = n2u
self.flags = self.init_n2i()
self.x = None
self.u = None
textdef __getitem__(self, item): if item in self.n2x: return self.x[self.n2x[item]] elif item in self.n2u: if self.u[self.n2u[item]] == -1: raise ValueError('不能使用未赋值变量 {} 的值!'.format(item)) else: return self.u[self.n2u[item]] else: raise IndexError('索引 {} 不存在于当前管理器对象当中,请检查!'.format(item)) def __setitem__(self, key, value): if key in self.n2x: raise IndexError('不能设置已知变量 {} 的值!'.format(key)) elif key in self.n2u: if self.u[self.n2u[key]] == -1: # 未赋值 self.u[self.n2u[key]] = value else: # 已赋值 raise IndexError('不能重新设置已赋值未知变量 {} 的值!'.format(key)) else: raise IndexError('索引 {} 不存在于当前管理器对象当中,请检查!'.format(key)) def init(self, x, u=None): self.x = x if u is None: self.u = np.ones(len(self.n2u)) * -1 # 全部设置为 -1 else: self.u = u def init_n2i(self): flags = dict() redundant_names = [] for name in self.n2x.keys(): flags[name] = True for name in self.n2u.keys(): if name in flags.keys(): redundant_names.append(name) else: flags[name] = False if len(redundant_names) > 0: raise IndexError('索引 {} 同时出现在已知与未知变量当中,请检查!'.format(redundant_names)) return flags def pt2h(self, name): self[name + '_H'] = h_pt(self[name + '_P'], self[name + '_T']) def check_val(self): for name in self.n2u.keys(): if self.u[self.n2u[name]] == -1: logging.error('索引 {} 变量尚未求值,请检查程序正误!'.format(name)) @property def val(self): return self.u # 预定义方程 def mas_bal(self, inlets, outlets): res = 0.0 for inlet in to_list(inlets): res += self[inlet + '_M'] for outlet in to_list(outlets): res -= self[outlet + '_M'] return res def en_bal(self, inlets, outlets, inputs=None, outputs=None): res = 0.0 for inlet in to_list(inlets): res += self[inlet + '_M'] * self[inlet + '_H'] for input in to_list(inputs): res += self[input] for outlet in to_list(outlets): res -= self[outlet + '_M'] * self[outlet + '_H'] for output in to_list(outputs): res -= self[output] return res / 1e6 def p_bal(self, inlet, outlet, dp=None): res = self[inlet + '_P'] - self[outlet + '_P'] if dp is not None: res -= self[dp] return res def h_eq(self, stream, mode='pt'): res = 0.0 if mode == 'pt': res = self[stream + '_H'] - h_pt(self[stream + '_P'], self[stream + '_T']) elif mode == 'px': res = self[stream + '_H'] - h_px(self[stream + '_P'], self[stream + '_X']) elif mode == 'tx': res = self[stream + '_H'] - h_tx(self[stream + '_T'], self[stream + '_X']) elif mode == 'p0': res = self[stream + '_H'] - h_px(self[stream + '_P'], 0) elif mode == 't0': res = self[stream + '_H'] - h_tx(self[stream + '_T'], 0) elif mode == 'p1': res = self[stream + '_H'] - h_px(self[stream + '_P'], 1) elif mode == 't1': res = self[stream + '_H'] - h_tx(self[stream + '_T'], 1) else: raise ValueError(f'无法识别计算模式 {mode},请检查!') return res / 1e3 def lv_bal(self, stream): return self[stream + '_T'] - t_sat(self[stream + '_P'])
class EquationManager:
"""
方程管理器类,用于管理方程残差对象。
"""
def init(self):
self.n2f = None
self.f = None
self._offs = None
textdef init(self, f): self.n2f = f.n2i self.f = np.zeros(len(f)) self._offs = [] def drop(self, index): if self._offs is None: raise ValueError('方程管理器未初始化,请检查!') if type(index) == str: if index not in self.n2f: raise IndexError('方程 {} 未定义,请检查!'.format(index)) self._offs.append(self.n2f[index]) else: self._offs.append(index) def __getitem__(self, item): if item in self.n2f: return self.f[self.n2f[item]] else: raise IndexError('方程 {} 未定义,请检查!'.format(item)) def __setitem__(self, key, value): if key in self.n2f: self.f[self.n2f[key]] = value else: raise IndexError('方程 {} 未定义,请检查!'.format(key)) @property def val(self): return np.array(self.f) @property def off(self): if self._offs is None: raise IndexError('方程管理器未初始化,请检查!') else: return self._offs'''
请逐行解释上述代码帮助我理解它的含义。
下面按从上到下的顺序,逐行/逐段解释你的代码在做什么,以及它背后的意图与注意点。为避免刷屏,我把同一功能块里语义连贯的多行合并讲,但每一句都覆盖到了。
from shutil import copy
只从 shutil
模块里引入文件复制函数 copy
,用于稍后把 .js/.css
静态资源拷贝到输出目录。
import re
, import sys
, import argparse
分别用于正则处理、系统信息(诸如打包路径)、命令行参数解析。
from heat_rect import *
把 heat_rect
模块里所有公开符号导入当前命名空间——看后文,里面应当包含:数值函数 elapsed_time, mse, h_pt, h_px, h_tx, t_sat
、数据结构 Measured, Unknown, Equation, MeasureData, UnmeasuredData, Data
,以及可能还转手导入了 numpy as np
、logging
、datetime
、os
等(否则本文件直接使用这些名字会报错)。
from load_data import Elements
读取设备/拓扑的描述类,用于把“点位”同设备结构关联。
from reprocess import DataFile
/ HtmlReport
数据缓存与生成 HTML 报告的封装。
from excel_loader import data_template
支持把“外部输入文件”转换为本系统所需的模板格式。
⚠️ 注意:本文件直接使用了
os
,logging
,datetime
,np
等名字,但并未在本文件import
;它们应来自heat_rect
的星号导入,否则会NameError
。
def conv_path(path): ... return path.rfind(os.path.sep) != -1
检查字符串里是否出现系统路径分隔符(例如 Windows 的 \
或 Linux 的 /
)。出现则视为“带路径”,否则视为“纯文件名”。
def to_list(elems): ...
把输入统一成列表:None -> []
,str -> [str]
,其它(假定已是可迭代)原样返回。后面用于把“单个入口/出口”与“多个入口/出口”的书写统一起来。
def parse_args(proj_name, title, template=True):
--proj
项目名称(默认传入的 proj_name
)--input
输入 CSV 文件名(默认 <proj>_x.csv
)--desc
报告标题(默认传入的 title
)--output
输出目录--trans
若提供,把外部文件转换为系统模板template is True
且用户提供了 --trans
,就调用 data_template(args.trans, template=f'{proj_name}_data_template.csv')
生成一份模板 CSV。args
。__init__(..., proj_name=None, input_name=None, desc=None, timer=None, link=True, output=None)
设定环境与路径,读取输入,构建日志与核心对象。
self.atmos = 0.101325
:默认大气压(单位应是 MPa)。self.x
(已知测点,Measured)/ self.u
(未知测点,Unknown)/ self.f
(冗余方程,Equation)self.bal_func
(热平衡 x→u)、self.dvr_func
(协调 x,u→f)、self.drop_func
(关闭部分方程)proj_name
默认值 'default'
。输入路径处理
input_name
:
replace('/', os.path.sep)
把斜杠统一为当前系统的分隔符。data/
目录,则退而到 data/
下找。FileNotFoundError
。input_name
:抛 ValueError
(必须有数据文件)。输出路径处理
result
。output
且已有 input_name
:
self.output = abspath(join(self.input_name, os.pardir))
取输入文件的“上级目录”。
等效于
dirname
的绕行写法。
re.match(r'(.*\\)?(.*)\.csv', self.input_name)
提取输入文件的“基名”(不含扩展名),然后把 result_dir
改成 result_<基名>
,并拼到输出目录。
⚠️ 这里正则只匹配反斜杠
\\
的 Windows 路径,Linux/Mac 下可能不匹配,建议用os.path
系列函数替代。
os.mkdir(self.output)
(单级目录创建)。其余初始化
self.report = None
,self.desc = desc or '数据协调计算报告'
,self.counter = 0
(用于多轮松弛时的文件后缀),self.data_file = None
,self.link_mode = link
。self.timer = timer or datetime.now()
用于统计耗时。self.log_path
(即 <output>/<proj>.log
)。随即打印“欢迎使用…”与传入的项目/标题。核心对象创建
self.create_var()
:读取 x/u/f 配置与数据,构造 Measured/Unknown/Equation
。self.create_file()
:创建用于保存结果/报告的对象,拷贝静态资源,初始化 HTML 报告骨架。self.data_file.cache('atmos', self.atmos)
把大气压写入缓存。n2x/n2u/n2f
:分别返回 x/u/f 的“名称→索引”映射(用于写方程时按名字索引)。time
:elapsed_time(self.timer)
,从起始计时点到当前的秒数。data_path
/report_path
:把相对路径转成以输出目录为根的绝对路径,若原本已是绝对路径则直接返回。log_path
:<output>/<proj>.log
的绝对路径。root_path
:程序根目录。若是打包(sys.frozen
)环境,用 sys._MEIPASS
,否则取当前脚本所在目录。set_atmos(atmos)
:设置大气压。
create_file()
:
按 self.counter
决定本轮文件名前缀(第一次为 <proj>
,第二次为 <proj>-1
…)。
DataFile('<name>.json', root=self.output)
用于存储本轮计算的数据。HtmlReport('<name>', root=self.output)
用于输出 HTML。<root>/bundle/
复制 .js/.css
静态资源到 self.output
(若尚未存在)。self.report.create(title=self.desc, author='中核运行研究院')
初始化报告。create_var()
:
Elements(self.proj_name)
,失败则记录 warning 并置为 None
。md_x = MeasureData(self.input_name)
md_x.load_elems(elems)
做绑定;随后 md_x.load()
读入数据。self.x = Measured(md_x)
& self.x.init_md()
完成对象构造。self.set_atmos(self.x.var.atmos)
用 x 中记录的大气压更新默认值。<root>/bundle/<proj>_u.csv
读;失败则退到当前工作目录 <proj>_u.csv
。self.u = Unknown(md_u)
& self.u.init_md()
。<root>/bundle/<proj>_f.csv
,失败则退本地 <proj>_f.csv
。self.f = Equation(md_f)
& self.f.init_md()
。create_func(bal_func, dvr_func, drop_func=None)
:
绑定“热平衡函数(x→u)”、“数据协调函数(x,u→f)”、“关闭冗余方程函数”。
bal_func is None
,则断言 self.link_mode is False
(链式模式要求有平衡函数)。self.drop_func
(可选)。drop_equations(names)
:
计划用于“从平衡方程集合中排除一些方程”。
目前未实现;而且
assert self.f is None
是明显错误,应该是“已存在 f”才有可能 drop(应为assert self.f is not None
)。函数体里的 TODO 也提示后续将用Partialer
完成。
init()
(一次性初始化数值)
link_mode
为真 或 bal_func
不为空,则 self.u.construct(self.bal_func)
通过热平衡函数生成 u 的初值或结构;否则跳过。self.f.construct(self.dvr_func)
以协调函数构造冗余方程系统;drop_func
,self.f.bind_drop(self.drop_func)
;self.f.patch_redundancy(self.x)
用 x 的信息做冗余性补丁(比如删掉多余方程/变量)。solve()
(一次求解流程)
cache_var('x', ..., md=True)
),便于报告和回溯。link_mode
或有 bal_func
,self.u.solve(self.x)
;self.u.init_static()
(把 u 初始化为静态/常数)。vunit
补丁以保证单位写入到 JSON)。self.f.mapping(self.x, self.u, link_mode=...)
基于当前 x/u 映射出方程残差向量,并缓存。init_err = mse(self.f.val)
计算初始残差均方差,写入缓存。self.x.evaluate(self.f, u=self.u, link_mode=...)
执行“数据协调优化”,本质是根据方程残差对 x 做最小二乘/带权校正。verify()
(粗大误差检验与修正,可多次调用)
quality = self.f.verify(..., chi2=True)
:基于卡方检验输出“品质因数”。
flag = quality < 1
:小于 1 视为整体无粗大误差;否则视为存在粗大误差。对应打印 info 或 warning。index_f = self.f.locate()
,缓存 f
的残差与索引。index = self.x.locate()
,缓存 x
的偏差、残差与索引。self.x.rectify()
对已知测点做协调校正;u.solve(self.x)
,否则 u.calc_static()
;self.x.patch_unknown(self.u)
把未知的解合并回 x(一些场景下报告展示全量变量);f.mapping(...)
,并 cache_new(...)
保存“新旧”数据对比;final_err = mse(self.f.residual())
记录最终残差;eq_err/chis
”与“临界值 K_crt
”。flag
(True 表示无粗大误差)。save_file()
(把本轮结果落盘)
self.data_file.convert_unit()
统一单位显示;self.data_file.save()
保存 JSON;save_csv()
导出 CSV 明细;self.report.generate_report(..., mode='xuf', log_path=...)
生成 HTML 报告;self.report.parse()
做二次解析(比如插图/表格替换)。routine()
(“一键式”完整流程)
依次调用 init() -> solve() -> verify() -> save_file()
并写日志。
__init__(self, n2x, n2u)
记录“名称→索引”的映射;self.flags = self.init_n2i()
校验 n2x 与 n2u 的重名冲突;self.x, self.u
初始为 None
。
__getitem__(self, item)
让你可以用 vm['A_M']
这样的方式:
x[n2x[item]]
;__setitem__(self, key, value)
仅允许给“未知变量”第一次赋值(把 -1
改为具体数),二次赋值或给“已知变量”赋值都会报错。
这保证了“求解构造阶段”只写 u,不破坏 x。
init(self, x, u=None)
绑定底层数组。若不传 u,就把 u 初始化为全 -1
的数组(“未赋值”哨兵)。
init_n2i(self)
合并 n2x/n2u 的 key;若重名,收集起来并抛异常(不允许同名变量既属于已知又属于未知)。
pt2h(self, name)
便捷方法:已知某流股的 P、T → 用 h_pt
计算焓,写入 <name>_H
。
check_val(self)
遍历 u,若还存在 -1
,在日志里报错提示“有未知变量尚未求值”。
val
属性:返回 u 的底层数组(方便拿到“所有未知量”的向量)。
预定义平衡/约束方程原语:
mas_bal(inlets, outlets)
:质量守恒(入的质量流量之和 − 出的质量流量之和)。en_bal(inlets, outlets, inputs=None, outputs=None)
:能量守恒(∑入 M·H + 外部功/热输入 − ∑出 M·H − 外部功/热输出),最后除以 1e6
(通常把单位从 W 级缩放到 MW 级报告)。p_bal(inlet, outlet, dp=None)
:压降方程(入口 P − 出口 P − 指定的压降 dp)。h_eq(stream, mode='pt')
:焓一致性(已知 H
与状态方程 h_pt/h_px/h_tx
计算的理论 H 之差),不同 mode
选择不同状态关系;最后除以 1e3
做单位缩放。lv_bal(stream)
:与饱和温度的偏差(T − t_sat(P)
),用于判断是否处于两相平衡等。这一整套接口让你在写“方程构造函数”时可以更像写“物理公式”,而不用手撸索引。
__init__
:准备 n2f
(名字到索引)、f
(残差向量)、_offs
(被关闭的方程索引列表)。
init(self, f)
:
把外部的 f
(Equation 对象)接进来:保存其 n2i
;新建同长度的零向量作为残差;把 _offs
清空为 []
。
drop(self, index)
:
标记一个方程“关闭不用”。既支持传入字符串名字(会查 n2f
)也支持直接传索引号。
只是“标记”,最终由上层在装配/求解时据此跳过。
__getitem__/__setitem__
:
允许用名字访问或写入残差向量:em['E101'] = 某个残差值
。
val
属性:返回残差向量的 np.array
拷贝(用于求 MSE 等)。
off
属性:返回被标记关闭的方程索引列表;若 init
未调用则报错。
路径正则跨平台问题
re.match(r'(.*\\)?(.*)\.csv', self.input_name)
只识别 Windows 路径。pythonbase = os.path.splitext(os.path.basename(self.input_name))[0] result_dir = f"result_{base}"
os.makedirs(self.output, exist_ok=True)
,避免父目录不存在时报错。drop_equations
里的断言明显写反
assert self.f is None
,意味着“若 f 已存在就失败”。assert self.f is not None
(要有 f 才能 drop)。同时实现 TODO。隐式依赖 heat_rect
导入了很多名字
os/logging/datetime/np
。若 heat_rect
没有转手导入或导出这些名字,会崩。import os, logging
、from datetime import datetime
、import numpy as np
。输入路径取上级目录的写法不直观
abspath(join(self.input_name, os.pardir))
等价于 dirname(abspath(self.input_name))
,建议改成后者更清晰。异常捕获过宽
except:
(不指定异常类型)会吞掉真实错误,排查困难。建议改为 except FileNotFoundError:
等精确类型,并在日志里带上 exc_info=True
。日志与状态
单位注释
en_bal
/h_eq
等方法里标注清楚输入/输出单位(J/kg? kJ/kg? W? MW?),避免使用者误解缩放因子 1e6/1e3
的意义。如果你愿意把 bal_func / dvr_func 的具体实现(你是如何用 VariableManager
写质量/能量/状态方程的)也贴上来,我可以继续按“逐行”方式把方程装配过程、链式求解和检验的细节讲透,并顺便给你一个最小可运行示例(CLI + 示例 CSV + 生成 HTML 报告)。
命令行参数解析是什么意思?能否更通俗一些?
好的,我用更通俗的方式来解释一下 “命令行参数解析”。
当你在命令行里运行一个 Python 程序时,可以在后面加上一些“额外的文字”,告诉程序要用哪些设置或输入。
比如:
bashpython my_program.py --proj demo --input data.csv --desc "测试项目"
这里的 --proj demo
、--input data.csv
、--desc "测试项目"
就叫 命令行参数。它们是用户在启动程序时传给程序的“外部指令”。
程序拿到的原始参数就是一串字符串,例如:
python["--proj", "demo", "--input", "data.csv", "--desc", "测试项目"]
这只是原始的文字,程序还不认识它们的含义。
命令行参数解析 就是把这些文字“翻译”成程序里能直接用的变量。
解析之后,程序会得到一个“参数对象”,里面的属性就是你想要的内容:
pythonargs.proj # "demo" args.input # "data.csv" args.desc # "测试项目"
想象你开外卖 App 点餐:
要不要我给你写一个特别简短的小例子(只有几行代码),让你能直接在命令行试一试命令行参数解析是怎么工作的?
@property是什么意思?
在 Python 里,@property
是一个 语法糖(装饰器),用来把一个方法“伪装”成像属性一样访问。
通常情况下,如果你写一个方法,需要加括号调用:
pythonclass Person: def get_name(self): return "Alice" p = Person() print(p.get_name()) # 需要加括号
但如果加上 @property
:
pythonclass Person: @property def name(self): return "Alice" p = Person() print(p.name) # 不用括号,像访问变量一样
结果完全一样,但是调用方式更自然。
想象你有一台“智能空调”:
@property
就像是把复杂的计算封装起来,但外部用的人看上去就是一个简单的属性。你那段 Solver
代码里有:
python@property def log_path(self): return os.path.abspath(os.path.join(self.output, self.proj_name + '.log'))
这意味着:
solver.log_path
(像访问变量一样),直接得到日志文件路径。要不要我给你做一个小例子,演示 @property
还能配合 setter(@xxx.setter
)实现“读写属性”,这样你就能更直观地理解它的威力?
在你提到的这一行:
pythonquality = self.f.verify(self.x, u=None if self.link_mode else self.u, chi2=True)
真正的“计算过程”发生在 self.f.verify(...)
里面。
self.f
在 Solver
初始化时,self.f
被设定为一个 Equation
对象(在 create_var()
里由 Equation(md_f)
生成)。
这个对象里面存储了 冗余平衡方程(能量守恒、质量守恒等),以及和测量数据的对应关系。
verify(...)
方法
self.x
)”和“未知测点数据 (self.u
)”代入冗余方程,计算残差,然后做统计检验。self.x
:测量点数据(已知变量)。u=None if self.link_mode else self.u
:如果是“链式模式”,不需要手动传未知量,否则传 self.u
。chi2=True
:告诉函数要用 卡方检验 的方式来评价残差。计算内容通常包括:
x
、u
代入方程,得到残差向量。quality
指标(例如:卡方值与临界值的比值,或者“品质因数”)。结果赋值给 quality
quality < 1
→ 表示整体数据没有粗大误差(即残差统计量低于临界值)。quality >= 1
→ 表示存在粗大误差。Equation.verify()
方法里面。Equation
类的实现,但可以肯定:残差计算、卡方检验、统计指标生成,全都写在 verify()
方法里。要不要我帮你推断一下 verify()
方法内部大概会做的数学公式(比如残差平方和 / 卡方检验过程),这样你就能完全理解“计算发生在哪里 + 算了什么”?
import os
import logging
from functools import partial
from datetime import datetime
from typing import Callable
import scipy.linalg as la
import numpy.linalg as nla
from scipy.stats import chi2 as c2
from scipy.stats import f as fd
from scipy.stats import norm as nd
import pandas as pd
from quick_grad import *
from load_data import Data
from load_data import MeasureData
from load_data import UnmeasuredData
from load_data import unit_convert
def create_random_input(var_dict, dx=0.01, std=0.05, gross_error=False, gross_index=None, gross_level=1.2):
"""
根据给定的变量字典 var_dict
,创建一组随机模拟测点变量数据
text:param var_dict: 变量字典,以变量名称为键名、变量平衡值为键值 :param dx: 系统误差(测点均值偏差)相对大小,默认值为 0.01 :param std: 随机误差(测点标准差)相对大小,默认值为 0.05 :return: 测点变量均值 x、95% 置信度区间宽度 xv、协方差矩阵 Sx """ x0 = np.array(list(var_dict.values()), np.float64) x = x0 + np.array(np.random.rand(len(x0))) * x0 * dx # std error generation xv = np.array(np.random.rand(len(x0))) * x * std confidence = 1.96 Sx = np.diag((xv / confidence) ** 2) return x, xv, Sx
def plugin_covariance(Sx, name1, name2, n2i, rho=0.1):
"""
根据给定的测点名称及索引字典,在协方差矩阵中附加不同测点之间的协方差值。
该函数为原位函数,无返回值。
text:param Sx: 变量协方差矩阵 :param name1: 要附加协方差的随机相关测点名称1 :param name2: 要附加协方差的随机相关测点名称2 :param n2i: 变量名称索引字典,可通过 `create_name_index` 函数生成 :param rho: 变量相关系数大小,|rho| <= 1,默认值为 1 """ idx1, idx2 = n2i[name1], n2i[name2] assert idx1 != idx2 # should not plug in (self) variance Sx[idx1, idx2] = Sx[idx2, idx1] = rho * Sx[idx1, idx1] * Sx[idx2, idx2]
def mse(x):
return np.sqrt(np.sum(x ** 2) / len(x))
def pivot_transform(p):
indices = np.arange(len(p))
for i in range(len(p)):
if p[i] != i:
indices[p[i]], indices[i] = indices[i], indices[p[i]]
return indices
def elapsed_time(init_time):
return (datetime.now() - init_time).seconds
class Partialer:
"""
用于将部分索引位置上的变量从数组或矩阵中移除,或反之补零还原为之前的形状。
"""
def init(self, n, index):
self.length = n
self.index = index
self.keep = []
for i in range(n):
if i not in index:
self.keep.append(i)
self.keep = np.array(self.keep)
textdef __len__(self): return self.length def partial_array(self, array, axis=0): assert axis in (0, 1) if axis == 0: return array[self.keep] elif axis == 1: return array[:, self.keep] def partial_matrix(self, matrix): return matrix[self.keep][:, self.keep] def full_array(self, partial_array, axis=0): assert axis in (0, 1) if axis == 0: if np.ndim(partial_array) == 1: array = np.zeros(len(self)) else: assert np.ndim(partial_array) == 2 array = np.zeros(len(self), partial_array.shape[1]) array[self.keep] = partial_array[:] else: assert np.ndim(partial_array) == 2 array = np.zeros(partial_array.shape[0], len(self)) array[:, self.keep] = partial_array[:, :] return array def full_matrix(self, partial_matrix): mat = np.zeros((len(self), len(self))) n = partial_matrix.shape[0] for i in range(n): for j in range(n): k, l = self.keep[i], self.keep[j] mat[k, l] = partial_matrix[i, j] return mat
class Variable:
"""
变量基类。
"""
textdef __init__(self, var: Data): """ 使用特定类型的辅助数据实例化变量对象。 :param var: 初始化所用的辅助数据,必须是 Data 类型的对象. """ assert isinstance(var, Data) # Data 数据对象 self.var = var self._n2i() self.stash_flag = False # if stash old result self.total_penalty = 0.0 # for chi^2 verification self.val = None # 用于检测数据是否创建 @property def names(self): return self.var.name def __len__(self) -> int: return len(self.var) def __getitem__(self, item): if type(item) == str: return self.val[self.n2i[item]] else: return self.val[item] def __setitem__(self, item, val): if type(item) == str: self.val[self.n2i[item]] = val else: self.val[item] = val def init_md(self): """ 处理 Data 数据对象当中的数值与单位信息,转换为统一单位的 Torch 张量数据。 """ if not isinstance(self.var, Data): raise TypeError('当前测量对象绑定的数据类型有误(应为 Data,实为 {}),请检查!'.format( str(type(self.var)))) def show(self): """ 在 stdout 中输出当前数据条目内容。仅支持使用 Data 数据对象定义的测点对象。 """ assert isinstance(self.var, Data) print(self.var.data) def stash(self): """ 储存当前变量值,供之后运算时比较用。 """ # logging old values self.val_old = self.val.copy() if not isinstance(self, Unknown): # 未知测点没有不确定度信息,不进行存储 self.S_old = self.S.copy() self.intv_old = self.intv.copy() self.stash_flag = True def debug(self): """ 调试函数,返回变量索引包装后的数据字典。 """ dic = {} for name in self.names: dic[name] = float(self[name]) return dic def _n2i(self, no_repeat=True): """ 创建变量名称索引字典,使得可以通过变量名称索引到测点下标。 :param no_repeat: 不允许出现重复定义(同名多条目)。默认为 True. """ n2i = {} repeat_names = [] for i, name in enumerate(self.names): if name in n2i.keys() and no_repeat: repeat_names.append(name) else: n2i[name] = i if len(repeat_names) > 0: for name in repeat_names: logging.error("变量条目 '{}' 重复定义,而当前数据结构不允许重复定义,请检查数据正确性!".format(name)) raise ValueError("Variable {} is repeatedly defined in data file {}.".format(repeat_names, self.var.path)) self.n2i = n2i
class Measured(Variable):
"""
测量变量类,即已知测点变量类。
"""
def init(self, var: MeasureData):
assert isinstance(var, MeasureData)
super().init(var)
self.redundancy = {} # 定义冗余测点集
self.eq_err = [] # 迭代步当中方程残差
self.chis = [] # 卡方值
self.err = [] # 粗大误差测点集
textdef init_md(self): """ 使用 MeasureData 数据对象初始化数据。 """ super().init_md() assert isinstance(self.var, MeasureData) self.var.convert() # 转换数据单位 # 生成 Numpy 数据 self.init_val = np.array(self.var.val) self.val = np.array(self.var.val) self.init_intv = np.array(self.var.intv) self.intv = np.array(self.var.intv) self.S = np.diag((self.intv / 1.96) ** 2) # 记录 CST 与 OFF 测点 exclude = [] for i in range(len(self)): if self.var.accuracy[i] == 'CST': exclude.append(i) exclude.extend(self.var.off) # 加入 OFF 测点 self.part = Partialer(len(self), exclude) self._n2i(index=self.var.off) # 重新生成 n2i self.stash() def plugin_covariance(self, idx1, idx2, rho=0.1): self.S[idx1, idx2] = self.S[idx2, idx1] = rho * sqrt(self.S[idx1, idx1] * self.S[idx2, idx2]) self.stash() def evaluate(self, f, u, eps=1e-3, max_iter=100, link_mode=True): """ 根据给定测点变量,构建数据协调优化问题并求解,获得已知测点修正值及未知测点新估计值。 :param f: Equations 对象,平衡方程残差变量 :param u: Unknown 对象,未知测点,可置空 :param eps: 收敛判断时容许的最大残差,默认值为 1e-3 :param max_iter: 容许的最大迭代次数,默认值为 100 :param link_mode: 是否启用链接模式,即使用 u 变量中预定义的热平衡函数求解 x 到 u 的映射关系,默认为 True :return: 无返回值,相应计算结果直接更新到变量属性当中 """ # 已知变量 self.v = np.zeros_like(self.val) self.v = self.part.partial_array(self.v) # 未知变量 if not link_mode: du = np.ones_like(u.val) # 协方差 S = self.part.partial_matrix(self.S_old) # 原始协方差矩阵,过程中不更新 Sv = np.zeros_like(self.S) Sv = self.part.partial_matrix(Sv) sx = self.part.partial_array(np.diag(self.S_old)) s_inv = 1 / sx # 过程变量 counter = 0 timer = datetime.now() # 用于统计单步用时 self.total_penalty = 0.0 self.eq_err, self.chis = [], [] while counter < max_iter: # 方程与变量裁剪 f.drop(self, u) # 移除被关闭的方程 f_names = f.part.partial_array(np.array(f.names)) arange = np.arange(len(f_names)) f_val = f.part.partial_array(f.val) f_grad = self.part.partial_array(f.grad, axis=1) # 去除未使用测点 f_grad = f.part.partial_array(f_grad, axis=0) # 去除未使用方程 f_grad_u = f.part.partial_array(f.grad_u, axis=0) # 去除未使用方程 f_S = f.part.partial_matrix(f.S) # 链接模式预处理 if link_mode: # 检查零对角元 f_diag = np.diag(f_S) f_zero = arange[f_diag == 0] if len(f_zero) > 0: index_zero = f_names[f_zero] raise ValueError('存在方差为零的平衡方程,请检查其冗余性:{}'.format(index_zero)) else: # 检查极小对角元 # 仅在第 0 步检查 if counter == 0: f_small = arange[np.abs(f_diag) < 1e-6] if len(f_small) > 0: for si in f_small: fs_name, fs_val = f_names[si], f_diag[si] logging.warning(f'方程 {fs_name} 方差极小({fs_val:.2E}),请注意确认其冗余性。') F_val = f_val F_grad = f_grad F_S = f_S # 非链接模式预处理 else: # QR 分解 m, p = f_grad_u.shape assert m > p, f'方程数({m})不大于未知变量数({p}),无法求解,请检查!' Q, R = nla.qr(f_grad_u, mode='complete') Q1, Q2 = Q[:, :p], Q[:, p:] R1 = R[:p, :] # 仅在需要计算未知测点修正量时使用 # 消元降维 F_val = Q2.T @ f_val F_grad = Q2.T @ f_grad F_S = Q2.T @ f_S @ Q2 # LU 分解 Dlu = la.lu_factor(F_S) # 奇异矩阵检测 zeros = np.diag(Dlu[0]) == 0 if len(np.arange(len(F_val))[zeros]) > 0: r = nla.matrix_rank(F_S) logging.error( '当前方程协方差矩阵不可逆(矩阵秩 {} < 满秩 {}),无法进行数据协调计算,请检查输入数据!'.format(r, len(F_val))) zero_index = pivot_transform(Dlu[1])[zeros] if not link_mode: # 非链接模式下需还原到原始矩阵判断冗余平衡方程项 zero_set = set() for zi in zero_index: real_zero_index = arange[np.abs(Q2[:, zi]) > 1e-6] for zj in real_zero_index: zero_set.add(zj) zero_index = list(zero_set) zero_names = [f_names[i] for i in zero_index] logging.error(f'当前冗余平衡方程中的可疑项(共 {len(zero_index)} 项):{zero_names}') # 退出计算 exit(1) # 计算测点修正量 multiplier = la.lu_solve(Dlu, F_val) # 拉格朗日乘子 FSx = F_grad @ S dv = FSx.T @ multiplier # 计算单步修正量 self.v += dv # 累加 FSv = FSx - F_grad @ Sv # 过程中修正协方差 Sv += FSv.T @ la.lu_solve(Dlu, FSv) # 累加 err_v = mse(dv) # 计算未知变量修正量 if not link_mode: Dluu = la.lu_factor(R1) QF = Q1.T @ (f_val - f_grad @ dv) ddu = la.lu_solve(Dluu, QF) du += ddu u_grad = la.lu_solve(Dluu, Q1.T @ f_grad) # 模拟链式法则求解未知变量雅可比 err_u = mse(ddu) # 计算系统总体卡方值 v2 = self.v ** 2 self.total_penalty = v2 @ s_inv # 叠加修正步 self.step() if link_mode: # 链接模式下调用热平衡函数更新未知变量 u.solve(self) else: # 非链接模式下使用增量更新未知变量 u.val = u.val_old - du u.S = u_grad @ (S - Sv) @ u_grad.T f.mapping(self, u, link_mode=link_mode) # 记录数据 err_f = mse(f_val) # 计算方程残差 counter += 1 self.eq_err.append(err_f) self.chis.append(self.total_penalty) # 输出日志 err = err_v if link_mode else err_u # 不同模式下选取不同的变量计算迭代步长 step_time = datetime.now() - timer step_time = step_time.seconds + step_time.microseconds / 1e6 logging.info('[{:d} / {:d}] 总卡方值 {:.3f} | 迭代步长 {:.3E} | 方程残差均方差 {:.3E}.(单步用时 {:.3f} 秒)'.format( counter, max_iter, self.total_penalty, err, err_f, step_time)) timer = datetime.now() # 判断是否收敛 if err < eps: logging.info('数据协调计算已在 {:d} 步内收敛,当前方程残差均方差为 {:.3E}。'.format(counter, err_f)) break # 超出最大迭代次数 elif counter == max_iter: logging.warning('数据协调计算未在 {:d} 步内收敛,请检查输入!当前方程残差均方差为 {:.3E}'.format(max_iter, err_f)) # 数据处理及还原 sv = np.diag(Sv) # 提取对角元 penalty = v2 / np.maximum(sv, sx / 10) # 按 VDI-2048 标准计算卡方值 self.penalty = self.part.full_array(penalty) self.Sv = self.part.full_matrix(Sv) def evaluate2(self, f, eps=1e-6): """ 使用二阶算法构建及求解数据协调优化问题。 不支持包含未知测点的情形。 TODO: 使本算法适配含未知测点的情形. :param f: 平衡方程残差项。 :param eps: 迭代收敛限,默认值为 1e-6。 """ v0 = np.zeros_like(self.val) v = np.zeros_like(self.val) err = 1 H = f.hess(self.val) while err > eps: Hv0 = (H @ v0).T F = f.grad.T + Hv0 # Dlu = la.lu_factor(f.grad @ self.S @ F) Hfv0 = (v0.T @ H @ v0) / 2 # multiplier = la.lu_solve(Dlu, f.val + Hfv0) multiplier = la.solve(f.grad @ self.S @ F, f.val + Hfv0) v = self.S @ F @ multiplier err = np.sqrt(np.sum((v - v0) ** 2) / len(v)) v0[:] = v logging.debug('itering, err: {:.3E}'.format(err)) self.v = v self.multiplier = multiplier Dlu = la.lu_factor(f.S) FSx = F.T @ self.S F_ast = la.lu_solve(Dlu, FSx) self.Sv = FSx.T @ F_ast self.total_penalty = np.sum(np.abs(f.val @ self.multiplier)) # 计算总卡方值 # 计算各变量卡方值 self.penalty = np.square(self.v) / np.maximum(np.diag(self.Sv), np.diag(self.S) / 10) def rectify(self): """ 确认无粗大误差后,施加修正量,完成变量数据协调。 """ # 修正协方差矩阵 self.val = self.val_old - self.part.full_array(self.v) self.S = self.S_old - self.Sv self.variance = np.diag(self.S) self.intv = self.variance ** 0.5 * 1.96 # self.bound() # bound constraints def step(self): """ 在未知测点算法迭代过程中,步进当前测点变量值。 """ self.val = self.val_old - self.part.full_array(self.v) # self.bound() def locate(self): """ 粗大误差定位。 :return: 可能存在粗大误差的测点下标。 """ self.r = np.sqrt(self.penalty) excessive_index = np.argsort(self.r)[::-1][:len(self.r[self.r > 1.96])] self.err = excessive_index # 储存索引变量 return excessive_index def bound(self): """ 更新测点值后,应检查确认某些类型的测点值不超过约束范围。 """ # TODO: 实施约束之前先做一轮单位转换 assert isinstance(self.var, MeasureData) for i in range(len(self)): if self.val[i] < self.var.lb[i]: self.val[i] = self.var.lb[i] elif self.val[i] > self.var.ub[i]: self.val[i] = self.var.ub[i] def relax(self): """ 根据粗大误差识别的结果,对存在粗大误差的变量进行校验。 """ self.val[:] = self.val_old # 更新为初值 for i in self.err: self.intv[i] *= 2 # 不确定度放宽 1.5 倍 logging.info("测点 '{}' 的粗大误差由 {:.2f}% 放宽至 {:.2f}%。".format( self.names[i], self.init_intv[i] / self.val[i] * 100, self.intv[i] / self.val[i] * 100)) self.S = np.diag((self.intv / 1.96) ** 2) # 重新计算协方差矩阵 def patch_unknown(self, u): """ 对于 OFF 测点,从未知变量计算结果当中提取对应结果赋值。 """ for i in self.var.off: name = self.names[i] defined_flag = False if name in self.names: # 已知测点中有同名测点 if name in self.n2i: # 变量名可索引到值(非 OFF 测点) j = self.n2i[name] # 获取主变量索引 assert j not in self.var.off # 备用检查 # 需进行单位逆转换 val, intv = unit_convert(self.val[j], self.var.vartype[i], self.var.vunit[i], self.intv[j], self.var.accuracy[j], reverse=True) defined_flag = True if name in u.names: # 同时出现在已知与未知测点当中,检查问题 logging.warning('测点 {} 有测量值,同时定义了同名未知测点,请检查模型,增补冗余方程!'.format(name)) if name in u.names and not defined_flag: # 未知测点集中求解了相应值 j = u.n2i[name] # 需进行默认单位下的单位逆转换 val, intv = unit_convert(u.val[j], u.var.vartype[j], self.var.vunit[i], u.intv[j], acc=None, reverse=True) defined_flag = True if defined_flag: self.val[i] = val self.intv[i] = intv else: logging.debug('测点 {} 全部 OFF 且未通过热平衡求值,请检查模型!'.format(name)) def _n2i(self, no_repeat=True, index=None): self.redundancy = {} # 清空冗余测点集 if isinstance(self.var, MeasureData): n2i = {} for i in range(len(self)): name, token = self.var.name[i], self.var.token[i] if not pd.isna(token): n2i[token] = i # 对所有测点,都允许通过测点位号索引 if name not in n2i.keys(): if index is None or i not in index: n2i[name] = i # 对于第一个非 OFF 测点,允许额外通过变量名称索引 elif index is not None and i not in index: # 当前测点为冗余测点 if name in self.redundancy.keys(): self.redundancy[name].append(token) # 直接将位号加入现有列表 else: self.redundancy[name] = [token] # 创建冗余点列表 self.n2i = n2i else: # 未使用 MeasureData 初始化数据,不存在冗余测点 super()._n2i(no_repeat=no_repeat) # 按默认方式处理
class Unknown(Variable):
"""
未知测点类。
"""
textdef __init__(self, var: UnmeasuredData): super().__init__(var) self.grad = None # 定义导数变量 def init_static(self): """ 对未知测点,生成静态初始预测值。 """ self.val = np.ones(len(self)) # 检查 var 对象中的 init_val if self.var.init_val is not None: for i in range(len(self)): if not pd.isna(self.var.init_val[i]): self.val[i] = self.var.init_val[i] else: # 未提供初值,按单位类型设置默认值 if self.var.vartype[i] == 'h': self.val[i] = 500 # kJ/kg elif self.var.vartype[i] == 'load': self.val[i] = 1000 # kW elif self.var.vartype[i] == 't': self.val[i] = 323.15 # K elif self.var.vartype[i] in ('dp', 'dt'): self.val[i] = 0 # 差量类型初值为 0 else: # 其他一切变量类型 self.val[i] = 1 self.intv = np.ones(len(self)) self.stash() def calc_static(self): """ 对未知测点,在采用迭代方式求解的情况下更新不确定度。 """ self.intv = np.sqrt(np.diag(self.S)) * 1.96 def init_md(self): """ 读取 Data 数据对象当中的信息,并初始化相应的数值结构。 """ super().init_md() self.val = None self.new = True self.func = None self._default_unit() # 分析并创建默认单位 def construct(self, solve_func: Callable): """ 绑定求解函数,用已知测点值求解未知测点平衡值。 :param solve_func: 求解函数。 """ self.func = solve_func self.jac = jacobian(self.func) self.hess = hessian(self.func) def solve(self, x: Measured, update_grad=True): """ 根据已知测点 x 的值,按给定的求解函数计算未知测点的值, 同时利用函数关系求出雅可比矩阵和协方差等信息。 :param x: 已知测点 x,应为 Measured 对象。 :param update_grad: 是否在本轮计算中更新梯度值,默认为 True. """ assert self.func is not None # 需要绑定求解函数 self.val = self.func(x.val) if update_grad: self.grad = jacobian(self.func)(x.val) self.S = self.grad @ x.S @ self.grad.T self.variance = np.diag(self.S) self.intv = np.sqrt(self.variance) * 1.96 if isinstance(self.var, Data): # 更新到数据库中 if self.new: self.var.data = self.var.data.assign(value=pd.Series(self.val).values, accuracy=pd.Series(self.intv).values) self.new = False else: self.var.data['value'] = pd.Series(self.val).values self.var.data['accuracy'] = pd.Series(self.intv).values if not self.stash_flag: self.stash() def bound(self): """ 根据上下界信息约束当前测点值。暂时未实现。 """ pass def _default_unit(self): vunit = [] for i in range(len(self)): vtype = self.var.data['type'][i] if vtype == 'h': vunit.append('kJ/kg') elif vtype == 'qm': vunit.append('kg/s') elif vtype in ('p', 'dp'): vunit.append('MPa') elif vtype == 't': vunit.append('K') elif vtype == 'dt': vunit.append('C') elif vtype in ('load', 'pw'): vunit.append('kW') elif vtype == 'v': vunit.append('m3/kg') else: vunit.append('') self.var.data['vunit'] = vunit
class Intermediate(Variable):
"""
辅助变量类,可用于构建中间变量或半方程量。
用作中间变量时,可用于表示较复杂的中间过程量,如焓值等。
用作半方程量时,可表示平衡方程一侧量。此类存在的主要目的是确定方程残差的总体量级,以便确认迭代终止条件。
"""
textdef __init__(self, var): super().__init__(var) def init_md(self): super().init_md() def construct(self, func): """ 构建函数关系。 :param func: 从已知及未知测点映射到半方程项的函数。 """ self.func = func self.jac = jacobian(self.func) self.hess = hessian(self.func) def mapping(self, input_x, input_u=None, update_grad=True, link_mode=True): """ 实施变量映射,并通过自动微分计算因变量关于自变量的偏导数(雅可比矩阵)值。 :param input_x: 已知测点变量,应为 Measured 对象。 :param input_u: 未知测点变量,应为 Unmeasured 对象,默认值为 None。 :param update_grad: 是否在本轮映射当中更新梯度值,默认为 True. :param link_mode: 是否使用链接模式传递未知变量对已知变量的关系,默认为 True. """ if input_u is None: self.val = self.func(input_x.val) if update_grad: self.grad = jacobian(self.func)(input_x.val) self.grad_u = None # for varify self.S = self.grad @ input_x.S @ self.grad.T else: self.val = self.func(input_x.val, input_u.val) if update_grad: if link_mode: self.grad = jacobian(lambda x: self.func(x, input_u.func(x)))(input_x.val) else: self.grad = jacobian(lambda x: self.func(x, input_u.val))(input_x.val) self.grad_u = jacobian(lambda u: self.func(input_x.val, u))(input_u.val) self.S = self.grad @ input_x.S @ self.grad.T self.variance = np.array(np.diag(self.S)) # self.variance[self.variance < 0] = 0 # 消除因特殊原因变负的对角项 self.intv = self.variance ** 0.5 * 1.96 if not self.stash_flag: self.stash()
class Equation(Intermediate):
"""
方程类。
"""
textdef __init__(self, var): super().__init__(var) self.offs = [] # 关闭的方程 self.part = None # 用于关闭方程 self.drop_func = None # 方程关闭函数 def init_md(self): super().init_md() self.val = None def construct(self, func, prev=None): """ 构建函数及导函数映射关系。 :param func: 函数表达式。 :param prev: 前置函数。默认为 None。 """ if prev is not None: self.func = func self.rawfunc = lambda x: self.func(prev.func(x)) self.jac = jacobian(self.rawfunc) self.hess = hessian(self.rawfunc) else: super().construct(func) def bind_drop(self, drop_func): """ 绑定用于判断关闭哪些方程的函数。 """ self.drop_func = drop_func def mapping(self, input_x, input_u=None, update_grad=True, link_mode=True): """ 实施变量映射。 :param input_x: 映射输入的主变量项,可为 HalfEquations 或 Measured 对象。 :param input_u: 映射输入的次变量项,若 input_x 为 HalfEquations 对象则应为空, 若其为 Measured 对象则 input_u 应为 Unmeasured 对象。默认为 None。 :param update_grad: 是否更新梯度值,默认为 True. :param link_mode: 是否按链式法则传递未知变量关于已知变量的梯度,默认为 True. """ super().mapping(input_x, input_u, update_grad, link_mode) if isinstance(input_x, Intermediate): # 输入的为中间变量半方程,需要使用链式法则计算梯度 self.grad = self.grad @ input_x.grad if input_x.grad_u is not None: self.grad_u = self.grad @ input_x.grad_u if not self.stash_flag: self.stash() def verify(self, x, u=None, chi2=False): """ 验证总体粗大误差。 :param x: 已知测点变量,应为 Measured 对象。 :param u: 未知测点变量,应为 Unmeasured 对象。默认值为空。 :param chi2: 是否使用卡方判据替代 VDI2048 判据,默认为 False。 """ if u is not None: # unknown variables exist, use Chi^2 distribution according to standard assumptions x.K_crt = c2.ppf(0.95, len(self) - len(u)) else: # no unknown variables, follow VDI-2048 and use F distribution if chi2: x.K_crt = c2.ppf(0.95, len(self)) else: x.K_crt = fd.ppf(0.95, len(self), 1e10) return x.total_penalty / x.K_crt # 返回品质因数 def locate(self): """ 方程粗大误差定位。 :return: 可能存在粗大误差的方程下标。 """ self.r = np.zeros_like(self.val) # 方程粗大误差指标 for i in range(len(self)): if i not in self.offs: sigma = self.S[i, i] ** 0.5 if sigma == 0: self.r[i] = 0 else: self.r[i] = self.val_old[i] / sigma self.r = np.abs(self.r) # 取绝对值 alpha = 0.05 beta = 1 - (1 - alpha) ** (1 / len(self)) phi = nd.ppf(1 - beta) excessive_index = np.argsort(self.r)[::-1][:len(self.r[self.r > phi])] return excessive_index def residual(self, y=None): if isinstance(y, Intermediate): return float(np.max(np.abs(self.val / y.val_old[::2]))) else: # 未构建半方程对象,直接输出各方程绝对残差值 return self.val def patch_redundancy(self, x: Measured): """ 将冗余测点间的平衡方程补充到现有的平衡方程当中。 """ red = x.redundancy if len(red.keys()) > 0: n2x = x.n2i n0 = len(self.n2i) # 增加冗余测点方程前的方程数量 eqs = {} # 冗余测点方程集 for name in red.keys(): tokens = red[name] for i in range(len(tokens)): token = tokens[i] eq_name = 'MEA{:d}_{}'.format(i+1, name) # 方程名称 # 增加条目 self.var.add_item({'name': eq_name, 'type': 'MEA', 'description': '{} 测点冗余测量平衡方程'.format(token)}) eqs[eq_name] = [name, token] # 包装为完整的函数 self._n2i() # 重新生成名称索引 self.func_old = partial(self.func) # 拷贝原先函数副本 def new_func(x, u): f = self.func_old(x, u) for eq_name in eqs: name, token = eqs[eq_name] f[self.n2i[eq_name]] = x[n2x[name]] - x[n2x[token]] return f self.func = new_func def drop(self, x: Measured, u: Unknown): """ 关闭某些不需要使用的冗余平衡方程。 """ if self.drop_func is None: self.part = Partialer(len(self), []) # 不移除任何方程 else: self.offs = self.drop_func(x.val, u.val) self.part = Partialer(len(self), self.offs)
class OutputVariable(Intermediate):
"""
输出变量,如机组热经济性指标、部件性能指标等。
"""
def init(self, var):
super().init(var)
textdef mapping(self, input1, input2=None): """ 实施变量映射。 :param input1: 输入变量之一,可为任意 Variable 对象。 :param input2: 输入变量之二,可为任意 Variable 对象,默认值为 None。 """ if input2 is not None: self.func(input1, input2) else: self.func(input1)
上述代码为文件:heat_rect.py的代码,请问在verify验证总体粗大误差处,c2.ppf是什么意思?这个方法是什么含义?
简短说:c2.ppf(...)
就是卡方分布的“分位点函数”(percent point function,= 逆 CDF)。
给它一个概率 q
和“自由度” df
,它会返回阈值 k
,使得
在你的代码里,这个 k
就是卡方检验的临界值。
pythonfrom scipy.stats import chi2 as c2 # 有未知量 u 时(方程个数 m 减去未知量个数 p 作为自由度) x.K_crt = c2.ppf(0.95, len(self) - len(u)) # 没有未知量但指定用卡方检验时 x.K_crt = c2.ppf(0.95, len(self))
0.95
:显著性水平 α=0.05 的上 95% 分位点(右侧 5% 尾部作为拒绝域)。len(self) - len(u)
或 len(self)
:自由度(通常 = 独立方程数 m;有未知量时有效独立约束减少为 m−p)。随后代码返回:
pythonreturn x.total_penalty / x.K_crt # 品质因数
这里 x.total_penalty
是统计量(类似“标准化残差平方和”);
当 x.total_penalty < x.K_crt
时(即返回值 < 1),认为整体无粗大误差;否则认为存在粗大误差。
把卡方分布想成一条曲线;c2.ppf(0.95, df)
找到一个点 k
,让曲线左边的面积是 95%。
如果你的统计量没超过这个点,就还在“正常波动”的 95% 范围内;超过了,就落在“异常”的 5% 尾部。
小例子:自由度 df=10 时,
c2.ppf(0.95, 10) ≈ 18.31
。
如果统计量 < 18.31,接受“无粗大误差”的假设;否则拒绝。
x.K_crt是什么用法?x是类,K_crt是某种属性吗?
是的,x.K_crt
在这里就是 类实例的一个属性。
x
是谁?在你的程序里,x
是 Measured
类的实例(已知测点变量对象)。
它继承自 Variable
,本身没有在 __init__
里定义过 K_crt
。
K_crt
从哪来?在 Equation.verify()
里有这么一段:
pythondef verify(self, x, u=None, chi2=False): if u is not None: # 有未知量时,自由度 = 方程数 - 未知数,用卡方分布 x.K_crt = c2.ppf(0.95, len(self) - len(u)) else: # 没有未知量 if chi2: x.K_crt = c2.ppf(0.95, len(self)) else: x.K_crt = fd.ppf(0.95, len(self), 1e10) return x.total_penalty / x.K_crt
也就是说,verify()
方法运行时,会动态地在 x
对象上添加一个新属性 K_crt
,用来保存“检验的临界值”。
在 Python 里,这是允许的:你可以在类定义之外,随时给对象绑定新的属性。
K_crt
的作用verify()
中,返回的“品质因数”就是
如果pythonquality = x.total_penalty / x.K_crt
quality < 1
→ 没有粗大误差;否则 → 存在粗大误差。pythonclass A: def __init__(self): self.total_penalty = 10 a = A() # 动态给对象添加新属性 a.K_crt = 15 print(a.total_penalty / a.K_crt) # 10/15 = 0.666...
这和你的 x.K_crt
用法是一样的。
✅ 总结:
x
是 Measured
类的对象。K_crt
在 Equation.verify()
中被赋值,相当于“卡方/F 检验临界值”。要不要我帮你画一个“数据流小图”,把 x.total_penalty
和 x.K_crt
在 verify()
中是怎么结合起来判断“有没有粗大误差”的?
什么是链接模式?
在你给的这套代码体系里,“链接模式 (link mode)” 指的是:
👉 在求解过程中,已知测点 x
与未知测点 u
之间的关系,通过预定义的“热平衡函数”直接联立起来计算。
链接模式 (link_mode=True
)
Unknown
类里,construct(self, solve_func)
会绑定一个函数:u = bal_func(x)
,即未知量 u
可以由已知量 x
通过热平衡函数算出来。Measured.evaluate()
或 Solver.init()
里,如果 link_mode
打开,就会直接用这个函数更新 u
。u
不再是独立的自由变量,而是通过链式法则从 x
计算得来。非链接模式 (link_mode=False
)
u
被当作独立的未知变量。x
一起,通过冗余方程残差最小化来求解。f(x, u)
并同时解出 u
和修正后的 x
。Solver.__init__
里有 self.link_mode = link
,默认是 True
。Solver.init()
里有:
表明在链接模式下,用热平衡函数初始化pythonif self.link_mode: self.u.construct(self.bal_func)
u
。Solver.solve()
里有:
在链接模式下,直接用pythonif self.link_mode or self.bal_func is not None: self.u.solve(self.x) else: self.u.init_static()
x
推算 u
。u
当作独立变量,适合做灵敏度分析或没有明确函数关系时的情况。想象一个锅炉模型:
u = f(x)
。要不要我帮你画一个“小图”,把 链接模式 和 非链接模式 在 x → u → f
之间的数据流对比直观展示出来?
请解释文件heat_rect.py中,evaluate方法的流程。
下面把 heat_rect.py
里 Measured.evaluate()
的执行流程按“入口 → 每轮迭代 → 结束收尾”讲清楚。它的目标是:在冗余方程约束下,对已知测点 x
做数据协调(求修正量 v
),并在需要时通过“链接模式”更新未知测点 u
,直到收敛。
pythonself.v = np.zeros_like(self.val) self.v = self.part.partial_array(self.v)
self.v
存“对测点 x 的修正量”(VDI-2048 里的 adjustment)。self.part
是 Partialer
,会裁剪掉常值点(CST)与 OFF 点,只在可调整的测点上优化。pythonif not link_mode: du = np.ones_like(u.val)
u
当作独立未知,要同时解 du
。pythonS = self.part.partial_matrix(self.S_old) # 原始协方差(不变) Sv = self.part.partial_matrix(np.zeros_like(self.S)) # 修正过程中累计增量 sx = self.part.partial_array(np.diag(self.S_old)) # 原始方差对角 s_inv = 1 / sx # 其逆,用于卡方值
pythoncounter = 0 timer = datetime.now() self.total_penalty = 0.0 self.eq_err, self.chis = [], []
max_iter
次)pythonf.drop(self, u) # 先根据 drop 规则屏蔽某些方程 f_names = f.part.partial_array(np.array(f.names)) arange = np.arange(len(f_names)) f_val = f.part.partial_array(f.val) f_grad = self.part.partial_array(f.grad, axis=1) # 去掉不可调整测点列 f_grad = f.part.partial_array(f_grad, axis=0) # 再去掉被关闭的方程行 f_grad_u= f.part.partial_array(f.grad_u, axis=0) # 同上(针对 u 的雅可比) f_S = f.part.partial_matrix(f.S) # 方程残差协方差
link_mode=True
)u = u.func(x)
,u
不是自由变量。f_S
的对角元是否为零(方差为 0 意味着方程被写死,不可用)。令:
pythonF_val = f_val # 有效残差 F_grad = f_grad # 对 x 的雅可比(已裁剪) F_S = f_S # 残差协方差
link_mode=False
)u
当独立未知,需要先消掉 u 再在 x 空间里优化(把问题降到只含 v 的约束)。f_grad_u
:pythonm,p = f_grad_u.shape # m=方程数, p=未知数 Q,R = nla.qr(f_grad_u, mode='complete') Q1,Q2= Q[:,:p], Q[:,p:] R1 = R[:p,:] # 后面解 du 用
Q2
在方程空间做消元降维:pythonF_val = Q2.T @ f_val F_grad = Q2.T @ f_grad F_S = Q2.T @ f_S @ Q2
直觉:把“与 u 相关的约束”投影掉,只留下对
v
有效的独立约束。
F_S
做 LU 分解并检查奇异pythonDlu = la.lu_factor(F_S) # 若奇异(秩亏),报错并提示可能病态的方程索引
pythonmultiplier = la.lu_solve(Dlu, F_val) # λ = F_S^{-1} * F_val FSx = F_grad @ S dv = FSx.T @ multiplier # v 的单步修正量 self.v += dv
pythonFSv = FSx - F_grad @ Sv Sv += FSv.T @ la.lu_solve(Dlu, FSv)
du
pythonif not link_mode: Dluu = la.lu_factor(R1) QF = Q1.T @ (f_val - f_grad @ dv) ddu = la.lu_solve(Dluu, QF) # 解 du du += ddu u_grad = la.lu_solve(Dluu, Q1.T @ f_grad) # du 对 x 的近似雅可比
pythonv2 = self.v ** 2 self.total_penalty = v2 @ s_inv # Σ (v_i^2 / σ_i^2)
pythonself.step() # self.val = self.val_old - full(self.v)
u
与方程映射pythonif link_mode: u.solve(self) # 用热平衡函数从 x 重新算 u else: u.val = u.val_old - du # 直接用 du 更新 u.S = u_grad @ (S - Sv) @ u_grad.T f.mapping(self, u, link_mode=link_mode)
pythonerr_f = mse(f_val) # 方程残差均方差 self.eq_err.append(err_f) self.chis.append(self.total_penalty)
pythonerr = mse(dv) if link_mode else mse(ddu)
打印进度日志(步号 / 总卡方 / 步长 / 残差 / 用时)
收敛判定
pythonif err < eps: # 收敛, break elif counter == max_iter: # 未收敛,给出 warning
Sv
的对角抽出pythonsv = np.diag(Sv)
pythonpenalty = v2 / np.maximum(sv, sx / 10) self.penalty = self.part.full_array(penalty) # 还原到完整测点序 self.Sv = self.part.full_matrix(Sv) # 同上
verify()
、rectify()
、报告生成等后续步骤使用。evaluate()
就是在“当前 x、(可能还有)u、冗余方程 f、以及协方差”下,最小化约束残差得到一组测点修正量 v
(以及非链接模式下的 du
),循环更新 x/u
与方程,直到步长小于 eps
收敛;最后产出卡方统计量、修正后的协方差增量等,为后续 verify()
判粗大误差 与 rectify()
应用修正 做准备。
import sys
import argparse
from heat_rect import *
from excel_loader import data_template
from solver import Solver
from solver import VariableManager
from solver import EquationManager
from coolprop import *
if name == 'main':
# argparse 模块
parser = argparse.ArgumentParser(description='数据协调计算程序')
parser.add_argument('--proj', type=str, required=False, default='CO2-SS', help='项目名称')
parser.add_argument('--input', type=str, required=False, default='CO2-SS.csv', help='输入数据名称')
parser.add_argument('--desc', type=str, required=False, default='微型燃气轮机系统', help='项目描述')
parser.add_argument('--output', type=str, required=False, default=None, help='结果输出路径')
parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径')
args = parser.parse_args()
text# 转换数据文件模式 if args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv') init_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output) n2x = solver.n2x # 获取索引变量 n2u = solver.n2u # 创建辅助变量集 X = VariableManager(n2x, n2u) # 热平衡函数 def u_val(x): X.init(x) ng = NG() # gas = GAS() # # Comp X['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P']) X['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF']) X['C_O_T'] = air_t_hp(X['C_O_H'], X['C_O_P']) X['C_LOAD'] = X['C_I_M'] * (X['C_O_H'] - X['C_I_H']) # Combustion Chamber X['FUEL_H'] = ng.h_tp(X['C_I_T'], X['C_I_P']) X['CC_O_M'] = X['C_I_M'] + X['FUEL_M'] X['CC_O_H'] = (X['FUEL_M'] * X['LHV'] + X['C_O_H'] * X['C_I_M'] + X['FUEL_H'] * X['FUEL_M'])/(X['CC_O_M']) X['CC_O_P'] = X['C_O_P'] * X['PLOSS_EFF'] # Turbo X['T_I_H'] = gas('H_TP', X['T_I_T'], X['T_I_P']) X['T_O_H'] = gas_turbine_h1(X['T_I_H'], X['T_I_P'], X['T_O_P'], X['T_EFF']) X['T_O_T'] = gas('T_HP', X['T_O_H'], X['T_O_P']) X['T_LOAD'] = X['T_I_M'] * (X['T_I_H'] - X['T_O_H']) # Gen X['GEN_LOAD'] = (X['T_LOAD'] - X['C_LOAD'] - X['MECH_LOSS'])*X['GEN_EFF'] return X.val # 创建数据协调平衡方程 n2f = solver.n2f F = EquationManager() # 冗余平衡方程 def f_val(x, u): F.init(solver.f) X.init(x, u) # ====== F['C_O_T_BAL'] = X['C_O_T'] - X['C_O_T1'] F['AIR_BAL'] = X['FUEL_M'] * X['AIR_ratio'] - X['C_I_M'] F['T_I_H_BAL'] = X['T_I_H'] - X['CC_O_H'] F['T_I_M_BAL'] = X['T_I_M'] - X['CC_O_M'] F['T_I_P_BAL'] = X['T_I_P'] - X['CC_O_P'] F['T_O_T_BAL'] = X['T_O_T'] - X['T_O_T1'] F['GEN_BAL'] = X['GEN_LOAD'] - X['ELE_LOAD'] return F.val # 执行计算 solver.create_func(u_val, f_val) solver.routine()
综合前面的代码,上述代码请逐行分析
下面把你给的 hope.py 逐行/逐段解释一遍,并结合前面贴过的 solver.py
、heat_rect.py
的类与流程,让每一行“放到系统里”都讲清楚它在做啥。
python# hope.py # --------- # CO2-SS# 机组模型 # author: 西安交通大学
pythonimport sys import argparse from heat_rect import * from excel_loader import data_template from solver import Solver from solver import VariableManager from solver import EquationManager from coolprop import *
sys
:系统相关(未直接用到,但保留无妨)。argparse
:命令行参数解析(让脚本可用 --proj/--input/...
灵活配置)。from heat_rect import *
:引入前面那份热平衡/数据协调底层实现(Measured/Unknown/Equation、数值函数、统计学等工具)。data_template
:把外部 Excel/CSV 转换成此系统需要的模板格式。Solver / VariableManager / EquationManager
:你在 solver.py
里定义的“总控求解器”“变量名访问器”“方程残差管理器”。from coolprop import *
:热物性接口(如 air_h_tp
、air_comp_h1
、gas(...)
等)。这里假设你把需要的热物性封装在这个模块里。pythonif __name__ == '__main__':
python hope.py
时,下面的逻辑才会执行;被当作模块导入时不会跑。pythonparser = argparse.ArgumentParser(description='数据协调计算程序') parser.add_argument('--proj', type=str, required=False, default='CO2-SS', help='项目名称') parser.add_argument('--input', type=str, required=False, default='CO2-SS.csv', help='输入数据名称') parser.add_argument('--desc', type=str, required=False, default='微型燃气轮机系统', help='项目描述') parser.add_argument('--output',type=str, required=False, default=None, help='结果输出路径') parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径') args = parser.parse_args()
args.proj / args.input / args.desc / args.output / args.trans
。bash# 例子 python hope.py --proj CO2-SS --input data/CO2-SS.csv --desc "微型燃机系统A" --output outdir
pythonif args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv')
--trans 原始文件
,这里会调用 data_template(...)
生成一份 CO2-SS_data_template.csv
,帮助你把外部格式适配到系统需要的列结构/单位。pythoninit_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output)
solver.py
:
data/
下尝试),并创建输出目录(如 result_CO2-SS
)。<output>/<proj>.log
。create_var()
会:
self.x
(Measured,已知测点),从 args.input
载入值/不确定度/单位;self.u
(Unknown,未知测点),文件 <proj>_u.csv
;self.f
(Equation,冗余平衡方程),文件 <proj>_f.csv
;create_file()
创建 DataFile
与 HtmlReport
,复制 .js/.css
静态资源,准备生成报告。pythonn2x = solver.n2x # 获取索引变量 n2u = solver.n2u
Measured/Unknown
),方便后面用名字访问/赋值。pythonX = VariableManager(n2x, n2u)
VariableManager
:
X['NAME']
读取变量值(优先从 x,其次 u);X['NAME'] = val
只允许给未知量赋值(给已知量赋值会抛错);X.init(x, u)
绑定底层数组(在函数里会调用)。pythondef u_val(x): X.init(x) ng = NG() # gas = GAS() # # Comp X['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P']) X['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF']) X['C_O_T'] = air_t_hp(X['C_O_H'], X['C_O_P']) X['C_LOAD'] = X['C_I_M'] * (X['C_O_H'] - X['C_I_H']) # Combustion Chamber X['FUEL_H'] = ng.h_tp(X['C_I_T'], X['C_I_P']) X['CC_O_M'] = X['C_I_M'] + X['FUEL_M'] X['CC_O_H'] = (X['FUEL_M'] * X['LHV'] + X['C_O_H'] * X['C_I_M'] + X['FUEL_H'] * X['FUEL_M'])/(X['CC_O_M']) X['CC_O_P'] = X['C_O_P'] * X['PLOSS_EFF'] # Turbo X['T_I_H'] = gas('H_TP', X['T_I_T'], X['T_I_P']) X['T_O_H'] = gas_turbine_h1(X['T_I_H'], X['T_I_P'], X['T_O_P'], X['T_EFF']) X['T_O_T'] = gas('T_HP', X['T_O_H'], X['T_O_P']) X['T_LOAD'] = X['T_I_M'] * (X['T_I_H'] - X['T_O_H']) # Gen X['GEN_LOAD'] = (X['T_LOAD'] - X['C_LOAD'] - X['MECH_LOSS'])*X['GEN_EFF'] return X.val
逐行说明(并指出这些“被赋值的名字”应当属于未知量集 u,否则会因 VariableManager
的检查报错):
X.init(x)
:把当前“已知测点数组”绑定给 X;未知量数组此处不传,意味着 本函数将“只向未知量赋值”。ng = NG()
:天然气工质对象(提供 h_tp
等性质函数)。
coolprop
一侧应定义了NG()
与gas(...)
等接口。
C_I_H = air_h_tp(C_I_T, C_I_P)
:入口(温度 T、压力 P)→ 空气焓 H。C_O_H = air_comp_h1(C_I_H, C_I_P, C_O_P, C_EFF)
:给定入口焓/压力、出口压力、等熵效率 → 出口焓。C_O_T = air_t_hp(C_O_H, C_O_P)
:由出口焓、压力 → 出口温度。C_LOAD = C_I_M * (C_O_H - C_I_H)
:压气机吸功(质量流 × 焓增)。FUEL_H = ng.h_tp(C_I_T, C_I_P)
:按入口状态估计燃料的“有效焓”(建模近似)。CC_O_M = C_I_M + FUEL_M
:质量守恒。CC_O_H = (...) / CC_O_M
:能量守恒(把燃料 LHV、入口空气焓、燃料焓汇总到出口混合焓)。CC_O_P = C_O_P * PLOSS_EFF
:考虑燃烧室压降/保压效率。T_I_H = gas('H_TP', T_I_T, T_I_P)
:把涡轮进口 T、P 转成焓(对于燃气)。T_O_H = gas_turbine_h1(T_I_H, T_I_P, T_O_P, T_EFF)
:已知进口焓/压、出口压、效率 → 出口焓。T_O_T = gas('T_HP', T_O_H, T_O_P)
:由焓/压反算温度。T_LOAD = T_I_M * (T_I_H - T_O_H)
:涡轮出功(质量流 × 焓降)。GEN_LOAD = (T_LOAD - C_LOAD - MECH_LOSS)*GEN_EFF
:轴系净功减损耗、乘发电机效率 → 电功输出。
return X.val
:把本轮求得的一整组未知量的值返回(Unknown.solve()
会接收这组值;这就是链接模式里“u = bal_func(x)
”的核心)。
关键理解:在 链接模式 下,
u_val
把你所有“能从 x 推导的未知量”一次性算出来,并通过VariableManager
写入各个u
项(如C_I_H/C_O_H/C_O_T/...
等)。
pythonn2f = solver.n2f F = EquationManager()
Equation
对象是在 f_val()
里 F.init(solver.f)
那一步完成的。pythondef f_val(x, u): F.init(solver.f) X.init(x, u) # ====== F['C_O_T_BAL'] = X['C_O_T'] - X['C_O_T1'] F['AIR_BAL'] = X['FUEL_M'] * X['AIR_ratio'] - X['C_I_M'] F['T_I_H_BAL'] = X['T_I_H'] - X['CC_O_H'] F['T_I_M_BAL'] = X['T_I_M'] - X['CC_O_M'] F['T_I_P_BAL'] = X['T_I_P'] - X['CC_O_P'] F['T_O_T_BAL'] = X['T_O_T'] - X['T_O_T1'] F['GEN_BAL'] = X['GEN_LOAD'] - X['ELE_LOAD'] return F.val
F.init(solver.f)
:告诉 EquationManager
这一次要管理的方程集合是谁(solver.f
),内部会抓取 n2f
并创建一个残差向量。X.init(x, u)
:这次既有 x
也有 u
(因为装配方程时要对两者的当前值做运算)。每一条 F[...] = ...
都是“残差 = 0”形式的约束(理想为 0,求解时让它尽量接近 0):
C_O_T_BAL
:压气机计算得到的出口温度 C_O_T
应等于测量的 C_O_T1
。
AIR_BAL
:理论需氧量平衡:FUEL_M * AIR_ratio == C_I_M
(燃料流量×需气比 = 压气机入口空气流)。
T_I_H_BAL
:涡轮进口焓 = 燃烧室出口焓(稳态能量守恒)。
T_I_M_BAL
:涡轮进口质量流 = 燃烧室出口质量流(稳态质量守恒)。
T_I_P_BAL
:涡轮进口压力 = 燃烧室出口压力(忽略管道压降或已计入 PLOSS_EFF
)。
T_O_T_BAL
:涡轮计算得到的出口温度 T_O_T
应等于测量的 T_O_T1
。
GEN_BAL
:计算得到的发电机输出功 GEN_LOAD
应等于电气测得的 ELE_LOAD
。
return F.val
:返回“所有残差”的数组给上层(Equation.mapping()
会用到)。
你也可以把这些平衡写成“质量守恒/能量守恒/设备性能”的标准原语(如
VariableManager.mas_bal/en_bal
),但这里直接按变量名写清楚也很好读。
pythonsolver.create_func(u_val, f_val) solver.routine()
create_func(u_val, f_val)
:把热平衡函数和冗余方程函数塞进 Solver
。
Solver
的 link=True
(默认),这一步使它进入链接模式:
u
不再是自由变量,而是由 u_val(x)
直接算出来(并自动带雅可比);f_val(x, u)
负责把“由 x/u 构造的物理残差”交给 Equation
做统计/线性化/权重处理。solver.routine()
:一键跑完整流程:
init()
:构造 u/f
的函数/梯度映射,补冗余测点方程;solve()
:执行 Measured.evaluate(...)
的迭代(见你之前问过的流程),更新 x
的修正向量、在链接模式下调用 u.solve(self)
根据 x
更新 u
;verify()
:卡方/F 判据做总体粗大误差检验与定位;若无粗大误差则做修正/更新残差;save_file()
:输出 JSON/CSV、生成 HTML 报告、保存日志。数据流:
x
(CSV)→ 绑定未知 u
与方程 f
定义;u = u_val(x)
(热平衡一把算出未知);f_val(x,u)
生成冗余方程残差与雅可比 ∂f/∂x
(链式传导 ∂u/∂x
);evaluate()
解“带权最小化残差”的问题,得到测点修正 v
,循环更新 x/u
直到收敛;verify()
做统计检验(c2.ppf / fd.ppf
求临界值),定位粗大误差;X['C_O_T']
这类名字必须存在于 Unknown
的清单里(<proj>_u.csv
),否则 VariableManager
会因“试图给已知量赋值”而报错。coolprop
/空气性质函数返回的单位要与 u
表定义一致(比如焓 kJ/kg
、温度 K
、压力 MPa
),否则方程平衡会虚假不一致。C_O_T
vs C_O_T1
:一个是计算量(未知),一个是测量量(已知);你在方程里对它们做“差为零”的校核,这是典型的数据协调写法。PLOSS_EFF
:乘数小于 1 代表压降,记得在数据文件里给它合适的值或不确定度。gas
的使用:你注释掉了 gas = GAS()
,却用 gas('H_TP', ...)
/gas('T_HP', ...)
,说明这里期望 gas
是一个函数(来自 coolprop
),不要再遮蔽这个名字。如果你愿意把 <proj>_u.csv
和 <proj>_f.csv
的头几行也贴出来(变量名与单位),我可以帮你逐项对照 u_val/f_val
,检查是否一一对应并给出更精确的单位检查清单。
在上述代码中,数据协调方法主要的流程是输入,中间的数据处理以及输出协调后的结果,针对输入,涉及到输入测点基本信息(测点名称、仪表的测量值、仪表的不确定度等),上述输入是在哪里体现的?计算完成后又输出到了哪里?
非常好的问题 👍
我们把整个 数据协调方法 的“输入 → 处理中间步骤 → 输出” 串起来看,就能明确 输入测点信息在哪里体现,以及 协调后的结果最后去了哪里。
在 Solver.create_var()
中:
python# 已知测点 md_x = MeasureData(self.input_name) # 读取输入 CSV md_x.load() self.x = Measured(md_x) # 构造已知测点对象 self.x.init_md()
self.input_name
就是 --input
传入的 CSV 文件(如 CO2-SS.csv
)。C_O_T1
、ELE_LOAD
)value
列)intv
列)unit, lb, ub, accuracy
等)这些信息通过 MeasureData.load()
→ Measured.init_md()
被加载,内部会:
convert()
)。self.val
,不确定度数组 self.intv
。self.S = diag((intv/1.96)^2)
。所以,输入测点基本信息就在 CSV 文件里,并在初始化时进入 Measured
类。
核心流程发生在 Measured.evaluate()
→ Solver.solve()
:
f_val(x,u)
,计算残差 f.val
。x
,让残差尽量接近 0,同时保持调整量不超过测点的不确定度。x
(和可能的未知量 u
),直到收敛。x.total_penalty
并和临界值 x.K_crt
比较(用来判断有无粗大误差)。locate()
和 rectify()
,定位并修正有问题的测点。在 Solver.save_file()
中:
python# 保存数据文件 self.data_file.convert_unit() self.data_file.save() # 保存 JSON self.data_file.save_csv() # 保存 CSV # 保存报告 self.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse()
<proj>.log
,记录每一步计算与误差检验过程。输出路径是 --output
参数指定的文件夹;若未指定,会在输入文件目录下生成 result_<proj>
文件夹。
--input
指定的 CSV 文件(如 CO2-SS.csv
),里面写了每个测点的名称、测量值、不确定度、单位等 → 在 MeasureData/Measured
中加载。evaluate()
/ solve()
中构造残差方程,迭代调整测点值,最小化卡方残差。result_xxx
文件夹里,包含 JSON 数据文件、CSV 表格、HTML 报告和日志。要不要我帮你画一张“数据流示意图”,把 CSV → Measured → evaluate → verify → rectified x/u → JSON/CSV/HTML 的流程用图形表示出来?这样会更直观。
import os
import re
import numpy as np
import pandas as pd
import logging
def num_format(num):
"""
根据变量的值(0、极小极大数--科学计算数法、普通数--两位小数),将其转化为指定样式的数据格式。
"""
if 1e-2 <= np.abs(num) < 1e3:
return '{:.2f}'.format(num)
elif num != 0:
return '{:.1E}'.format(num)
else:
return '0'
def unit_convert(val, vtype, vunit=None, intv=None, acc=None, atmos=0.101325, reverse=False, name=None):
"""
转换或逆转换单个物理量的单位。
text:param val: 要转换的值。 :param vtype: 要转换的值所属于的物理量类型。 :param vunit: 要转换的值的单位。默认为 None,即使用默认单位(不需要转换的那个)。 :param intv: 要转换的值的不确定度,以便按 vunit 一同转换单位,默认为 None。 :param acc: 原始数据中的 acc 信息,仅用来判断测点是否关闭。 :param atmos: 大气压值。默认为标准大气压(MPa)。 :param reverse: 是否逆转换(将标准单位换为 vunit 对应的单位)。默认为 False。 :param name: 变量的名称,默认为 None。 """ new_val, new_intv = None, None vname = '' vflag, undef_flag = False, False # 单位处理 factor = 1 # 单位乘数因子 adder = 0 # 单位加减因子 if val == 'OFF' or acc == 'OFF': # OFF 测点处理 new_val = val if intv is not None: new_intv = intv # 压力转换 elif vtype in ('p', 'dp', 'atm'): # 标准单位:MPa vname = '压力' if vtype == 'p' else '压差' if vunit in (None, 'MPa', 'MPa.a'): pass elif vunit in ('kPa', 'kPa.g', 'kPa0'): factor = 1e-3 elif vunit == 'hPa': factor = 1e-4 elif vunit == 'daPa': # 十帕 factor = 1e-5 elif vunit in ('Pa', 'pa'): factor = 1e-6 elif vunit in ('bar', 'Bar'): factor = 0.1 else: vflag = True if vtype == 'p' and vunit in ('kPa.g', 'MPa'): # 表压,不是压差或绝对压力 adder = atmos # 温度转换 elif vtype == 't': # 标准单位:K vname = '温度' if vunit in (None, 'K'): pass elif vunit == 'C': adder = 273.15 else: vflag = True # 温度转换 elif vtype == 'L': # 标准单位:m vname = '高度' if vunit == 'mm': factor = -0.101325 / 760 adder = 0.101325 else: vflag = True # 温差转换 elif vtype == 'dt': # 温差,K 与 C 都一样,不需要转换 vname = '温差' if vunit in (None, 'K', 'C'): pass else: vflag = True # 流量转换 elif vtype == 'qm': # 标准单位:kg/s vname = '质量流量' if vunit in (None, 'kg/s'): pass elif vunit == 't/h': factor = 1 / 3.6 else: vflag = True # 功率转换 elif vtype in ('pw', 'load'): # 标准单位:kW vname = '功率' if vtype == 'pw' else '负荷' if vunit in (None, 'kW', 'KW', 'kw'): pass elif vunit in ('MW', 'mw'): factor = 1e3 elif vunit in ('W', 'w'): factor = 1e-3 else: vflag = True # 焓值转换 elif vtype == 'h': # 标准单位:kJ/kg vname = '比焓' if vunit in (None, 'kJ/kg'): pass elif vunit == 'kJ/kmol': factor = 18.01528 else: vflag = True # 面积转换 elif vtype == 'a': # 标准单位:m2 vname = '面积' if vunit in (None, 'm2', 'm^2'): pass else: vflag = True # 换热系数转换 elif vtype == 'k': # 标准单位:kW/m2/K vname = '换热系数' if vunit in (None, 'kW/m2/K', 'kW/K/m2'): pass else: vflag = True elif vtype in ('num', 'x'): # 常规量纲 1 数值,不处理 vname = '无量纲数' if vtype == 'num' else '干度' if vunit == '%': # 百分比单位 factor = 1e-2 else: undef_flag = True # 实施逆转换,对已转换成标准单位的已知变量使用,以还原标准值 if reverse: new_val = (float(val) - adder) / factor if intv is not None: new_intv = float(intv) / factor # 实施正转换,对原始输入的已知变量值使用 else: if name is not None: # 在提供名称的情况下添加日志信息 if vflag: # 未能识别单位 logging.warning("{}测点 '{}' 的单位 {} 未能识别,将采用默认单位进行转换!".format(vname, name, vunit)) elif undef_flag: logging.warning("测点 '{}' 类型 {} 未能识别,将以标准单位读入其数值!".format(name, vtype)) # 使用因子实施转换 new_val = float(val) * factor + adder # 在提供不确定度的情况下,对不确定度进行转换 if intv is not None: new_intv = float(intv) * factor return new_val, new_intv
def batch_unit_convert(vals, vtypes, vunits=None, intvs=None, acc=None, atmos=0.101325, reverse=False):
"""
批量(逆)转换物理量的单位。
"""
new_vals = []
new_intvs = []
for i in range(len(vals)):
vtype = vtypes[i]
val = vals[i]
if vunits is not None:
vunit = vunits[i]
else:
vunit = None
if intvs is not None:
intv = intvs[i]
else:
intv = None
if acc is not None:
accuracy = acc[i]
else:
accuracy = None
new_val, new_intv = unit_convert(val, vtype, vunit, intv, accuracy, atmos, reverse)
new_vals.append(new_val)
new_intvs.append(new_intv)
if intvs is None:
new_intvs = None
return new_vals, new_intvs
class TagsData:
"""
由外方软件 TAGS_DEF 部分转换而来的数据读取模块。
"""
def init(self, path):
self.path = os.path.join('data', path)
self.data: pd.DataFrame = pd.read_csv(self.path, header=0)
self.target = None
text@classmethod def vtype(cls, name): if name[-2:] == '_P': return 'p' elif name[-2:] == '_T': return 't' elif name[-3:] == '_DP' or name[-4:] in ('_DP1', '_DP2'): return 'dp' elif name[-3:] == '_HW': return 'h' elif name[-4:] in ('_TTD', '_DCA'): return 'dt' elif name[-5:] == '_VAPF': return 'x' elif name[-5:] == '_AREA': return 'a' elif name[-5:] == '_LOAD': return 'load' elif name[-5:] == '_VOLF' or name[-7:] == '_VOLF_V': return 'qv' elif name[-5:] == '_LOSS' or name[-6:] == '_POWER': return 'pw' elif name[-6:] == '_UGLOB': return 'k' elif name[-6:] == '_MASSF' or name[-8:] in ('_MASSF_L', '_MASSF_V'): return 'qm' elif name[-8:] == '_DENSITY': return 'rho' else: return 'num' def trans(self): names, tokens = [], [] vals, vunits = [], [] vtypes = [] accs, aunits = [], [] for index, row in self.data.iterrows(): name = row['TAG NAME'] token = None if re.fullmatch(r'\d[A-Z]{3}\d{3}[A-Z]{2,3}', name): token = name name = row['OWNER VAR.NAME'].replace(' ', '_') name = name.replace('-', '_') # 统一转换为当前写法 names.append(name) tokens.append(token) acc = row['MEA.ACC.'] aunit = None vunit = row['P.U.'] vtype = TagsData.vtype(name) # 不确定度处理 if acc[0] == '-': # 负数表示相对不确定度 aunit = '%' acc = acc[1:] elif vunit == 'C': # 温度不确定度单位为摄氏度 aunit = 'C' accs.append(acc) aunits.append(aunit) vunits.append(vunit) vals.append(row['MEA.VAL.']) vtypes.append(vtype) self.target = pd.DataFrame({ 'name': names, 'type': vtypes, 'description': None, 'value': vals, 'vunit': vunits, 'accuracy': accs, 'aunit': aunits, 'token': tokens, 'lb': None, 'ub': None }) def save(self, path=None): if path is None: path = self.path.replace('.csv', '_new.csv') else: path = os.path.join('data', path) self.target.to_csv(path, index=False)
class Elements:
"""
设备描述文件类。
"""
def init(self, name):
"""
初始化路径信息,并读取设备描述数据。
text:param name: 系统名称,如 hn2、fq3 """ self.path = os.path.join('bundle', name + '_elem.csv') self.data = pd.read_csv(self.path, header=0, comment='#') self.var_dict = { 'P': '压力', 'T': '温度', 'M': '质量流量', 'V': '比容', 'QV': '体积流量', 'H': '比焓', 'X': '干度', 'S': '比熵', 'DP': '压降', 'DT': '温差', 'TTD': '上端差', 'DCA': '下端差', 'EFF': '效率', 'PR': '压比', 'K': '换热系数', 'A': '换热面积', 'VAR': '变量', 'FRAC1': '分流比1', 'FRAC2': '分流比2', 'M0': '设计质量流量', 'DP0': '设计压降', 'CFDP': '压降系数', 'DP1': '压降1', 'DP2': '压降2', 'Q': '比例流量', 'W': '功率', 'POWER': '功率', 'LOAD': '负荷', } def __len__(self): return len(self.data) def desc(self, name): """ 获取指定设备的描述信息。 :param name: 设备名称。 :return: 设备描述信息。 """ query = self.data[self.data['name'] == name]['description'] if len(query) > 0: return query.values[0] return name # 未查询到时原样返回 def param_desc(self, param_name): """ 获取指定参数的名称。 :param param_name: 参数名称。 """ if param_name in self.var_dict.keys(): return self.var_dict[param_name] else: # 未查询到描述的情况下原样返回,方便检查 return param_name def port_desc(self, port_name, elem_type): """ 获取指定端口的名称。 :param port_name: 端口名称。 :param elem_type: 设备类型,用来对特定字母调整端口描述。 """ if port_name == 'I': if elem_type == 'T': return '进汽' elif elem_type == 'HX': return '管侧给水' return '进口' elif port_name == 'O': if elem_type == 'T': return '排汽' elif elem_type == 'HX': return '管侧排水' return '出口' elif port_name == 'EX': if elem_type == 'HX': return '壳侧给水' return '抽汽' elif port_name == 'SH': return '壳侧给水' elif port_name == 'D': return '疏水' else: # 默认情况 return port_name + '侧' def elem_type(self, name): """ 获取指定设备的类型。 :param name: 设备名称。 :return: 设备类型。 """ query = self.data[self.data['name'] == name]['type'] if len(query) > 0: return query.values[0] return 'BX' # 未查询到时返回默认变量类型 BX(BOX) def parse_desc(self, name): """ 根据设备描述信息,解析变量名称的含义。 :param name: 测点名称。 """ tokens = name.split('_') if len(tokens) == 2: # 两段式,设备名 + 参数名 elem_name, param_name = tokens return self.desc(elem_name) + self.param_desc(param_name) elif len(tokens) >= 3: # 三段式,设备名 + 端口名 + 参数名 或 含_设备名 + 参数名 if len(tokens) == 3: elem_name, port_name, param_name = tokens else: elem_name = '_'.join(tokens[:-2]) port_name = tokens[-2] param_name = tokens[-1] complex_elem_name = elem_name + '_' + port_name if complex_elem_name in self.names: # 先查询是否存在对应的复杂设备名称(含 _) return self.desc(complex_elem_name) + self.param_desc(param_name) else: # 端口名 return self.desc(elem_name) + \ self.port_desc(port_name, self.elem_type(elem_name)) + \ self.param_desc(param_name) @property def names(self): return list(self.data['name'])
class Data:
"""
常规数据集类。
"""
def init(self, path):
self.data = None
self.path = path
self.elems = None # 设备描述文件
textdef __len__(self): return len(self.name) def load(self): self.data = pd.read_csv(self.path, header=0, comment='#') @property def name(self): return list(self.data['name']) @property def vartype(self): return list(self.data['type']) @property def description(self): return list(self.data['description']) def add_item(self, entry: dict): """ 增添新的数据条目。 :param entry: Dict 类型的条目内容,每次一条。 """ self.data = pd.concat([self.data, pd.DataFrame.from_dict({str(len(self.data)): entry}, orient='index')]) def load_elems(self, elems: Elements): self.elems = elems def drop(self, name): """ 删除指定名称的条目。 :param name: 待删除条目的名称。 """ self.data = self.data.drop(self.data[self.data['name'] == name].index)
class VariableData(Data):
def init(self, path):
super().init(path)
textdef load(self): super().load() # 自动生成变量单位 type_dict = { 'P': 'p', 'T': 't', 'H': 'h', 'V': 'v', 'X': 'num', 'M': 'qm', 'W': 'load', 'L': 'load', 'DP': 'dp', 'DT': 'dt', 'EFF': 'eff', 'FRAC': 'num', 'FRAC1': 'num', 'FRAC2': 'num', 'LOAD': 'load' } for i in range(len(self)): if pd.isna(self.vartype[i]): for sub in type_dict.keys(): if self.name[i][-len(sub)-1:] == '_' + sub: self.data.loc[i, 'type'] = type_dict[sub] if self.elems is not None and pd.isna(self.description[i]): self.data.loc[i, 'description'] = self.elems.parse_desc(self.name[i]) @property def lb(self): """ 变量下界,默认为 -inf. """ return list(self.data['lb']) @property def ub(self): """ 变量上界,默认为 inf. """ return list(self.data['ub']) def convert(self): """ 对于一般数据对象,仅处理上下界数值。 """ pass
class MeasureData(VariableData):
"""
测点数据集类。
"""
def init(self, path):
super().init(path)
self.val, self.intv = None, None
self.off = []
self.atmos = 0.101325
textdef load(self): super().load() # 数据列类型转换 self.data = self.data.astype({ 'name': str, 'type': str, 'description': str, 'vunit': str, 'aunit': str, 'token': str }) # 不确定度生成 for i in range(len(self.data)): if pd.isna(self.accuracy[i]): self.data.loc[i, 'accuracy'], self.data.loc[i, 'aunit'] = MeasureData._autoacc(self.name[i], self.vartype[i]) @property def value(self): return list(self.data['value'].astype(np.float64)) @property def vunit(self): return list(self.data['vunit']) @property def accuracy(self): return list(self.data['accuracy']) @property def aunit(self): return list(self.data['aunit']) @property def token(self): return list(self.data['token']) @property def dvalue(self): if 'dvalue' in self.data.keys(): return list(self.data['dvalue']) else: return [None] * len(self) def convert(self): """ 将输入数据转换成标准单位。 """ super().convert() # 执行上下界处理 n = len(self) val, intv = np.zeros(n, dtype=np.float64), np.zeros(n, dtype=np.float64) # 单位处理 for i in range(n): # 上下界处理 # TODO: 完善上下界生成机制,利用上设计值 lb = self.lb[i] ub = self.ub[i] # 仅对未赋上下界变量设置值 if pd.isna(lb) or pd.isna(ub): lb0, ub0 = MeasureData._generate_lb_ub(self.vartype[i]) if pd.isna(lb): lb, _ = unit_convert(lb0, self.vartype[i], self.vunit[i], atmos=self.atmos, reverse=True) if pd.isna(ub): ub, _ = unit_convert(ub0, self.vartype[i], self.vunit[i], atmos=self.atmos, reverse=True) self.data.loc[i, 'lb'] = lb self.data.loc[i, 'ub'] = ub # OFF 测点处理 if self.value[i] == 'OFF' or self.accuracy[i] == 'OFF': val[i] = 0.0 intv[i] = -1.0 self.off.append(i) # 记录 OFF 测点下标 continue # 非 OFF 测点进入值分析阶段 else: # 上下界检查 if self.value[i] < self.lb[i] or self.value[i] > self.ub[i]: # 值越界,进行处理 logging.warning('测点 {} 测量值 {} 越界 [{}, {}],请检查测量值正误!'.format( self.name[i], self.value[i], self.lb[i], self.ub[i])) # 若未提供设计值,则以上下界值进行约化 if pd.isna(self.dvalue[i]): value = self.lb[i] if self.value[i] < self.lb[i] else self.ub[i] logging.warning('测点 {} 无设计值,将使用上/下界值 {} 对测点测量值进行约化。'.format(self.name[i], value)) self.data.loc[i, 'value'] = value # 赋值时不能使用 property 定义 # 若提供了设计值,则直接约化为设计值 else: logging.warning('将使用设计值 {} 对测点 {} 测量值进行约化。'.format(self.dvalue[i], self.name[i])) self.data.loc[i, 'value'] = self.dvalue[i] # 单位转换 val[i], _ = unit_convert(self.value[i], self.vartype[i], self.vunit[i], None, self.accuracy[i], self.atmos, name=self.name[i]) # 更新数据集大气压 if self.vartype[i] == 'atm': self.atmos = float(val[i]) logging.info('读取到数据集中存在的大气压测点,当前大气压重置为 {:.3f}kPa。'.format(self.atmos * 1e3)) # 不确定度生成 # TODO: 完善不确定度单位判断机制,目前仅对不确度单位为 % 的情况进行区分 if self.aunit[i] == '%': # 百分比,相对不确定度 if val[i] == 0.0: # 使用相对不确定度时需规避 0 值出现,否则会使得协方差项为 0 intv[i] = 1e-6 # 使用极小值替代 0 else: intv[i] = val[i] * float(self.accuracy[i]) / 100.0 elif self.accuracy[i] == 'CST': # CST 测点,常数,不确定度设置为极小数 intv[i] = 1e-5 # 不能直接取为 0,影响后续计算正确性 else: intv[i] = float(self.accuracy[i]) # 生成转换格式后的输入数据 self.val = val self.intv = intv # self._redundant() def revert(self): """ 将标准单位的计算数据还原为输入时的单位形式。 """ raise NotImplementedError def _redundant(self): """ 处理冗余测点信息。 """ # 搜索重复项 repeated = [] for name in self.name: if self.name.count(name) > 1 and name not in repeated: repeated.append(name) # 逐一循环重复项 rm_index = [] for name in repeated: # 合并重复测点 index = np.arange(len(self))[np.array(self.name) == name] new_val = np.mean(self.val[index]) # 平均化测量值 new_intv = np.sqrt(np.mean(self.intv[index] ** 2)) / len(index) # 合成不确定度 self.val[index[0]] = new_val # 存储平均值 self.intv[index[0]] = new_intv # 存储不确定度 self.data.loc[index[0], 'token'] = ','.join(np.array(self.token)[index].tolist()) # 合并 token 信息 rm_index.extend(index[1:].tolist()) # 移除重复元素 rm = lambda array: np.delete(array, rm_index) self.val = rm(self.val) self.intv = rm(self.intv) self.data = self.data.drop(rm_index) @classmethod def _autoacc(cls, name, vartype): """ 自动生成未在数据表中声明的测点不确定度信息。 :param name: 测点名称。 :param vartype: 测点变量类型。 """ # switch if vartype == 'qm': acc, aunit = 5.0, '%' elif vartype == 'p': acc, aunit = 2.0, '%' elif vartype == 't': acc, aunit = 2.0, 'C' else: # 默认情况 logging.warning('未检测到测点 \'{}\' 变量类型可用的不确定度参数,将以默认变量类型设置其不确定度。'.format(name)) acc, aunit = 10.0, '%' logging.info('测点 \'{}\' 的不确定度未给定,根据变量类型 \'{}\' 设置为默认值 {}{}。'.format( name, vartype, num_format(acc), aunit)) return acc, aunit @classmethod def _generate_lb_ub(cls, vartype): lb = -np.inf # 下界 ub = np.inf # 上界 if vartype in ('p', 't', 'qm', 'x', 'pw'): lb = 0.0 if vartype == 'x': ub = 1.0 return lb, ub
class UnmeasuredData(VariableData):
def init(self, path):
super().init(path)
text@property def init_val(self): """ 返回迭代所用的初始值。 """ if 'ivalue' in self.data.columns: return list(self.data['ivalue']) else: return None
if name == 'main':
tags_data = TagsData('hndata_4.csv')
tags_data.trans()
tags_data.save('hn2_x_4.csv')
补充上述load_data.py代码,请重新解释上述问题
太好了,你把 load_data.py
也贴出来了。结合前面的 solver.py / heat_rect.py / hope.py
,我把“输入在哪里体现、中间如何处理、输出到哪里”按代码落点重新梳理一遍(指到具体类/字段与关键逻辑),这样你能顺藤摸瓜定位每一步的数据。
hope.py
/ solver.parse_args()
里,--input
指向一个 CSV(例如 CO2-SS.csv
)。Solver.__init__
→ create_var()
:
也就是:已知测点从这份 CSV 读入,落在pythonmd_x = MeasureData(self.input_name) md_x.load() self.x = Measured(md_x) self.x.init_md()
MeasureData.data
里(一个 pandas DataFrame)。在 MeasureData.load()
/ VariableData.load()
里能看到用到的列:
name
(测点名称)type
(变量类型,如 p/t/qm/h/…)description
(文字描述,可由 Elements.parse_desc()
自动补全)value
(仪表测量值,字符串或数值)vunit
(值的单位)accuracy
(不确定度值;若缺失会由 _autoacc()
自动生成)aunit
(不确定度单位:%
表示相对;C
表示温度的 ±℃;CST
表示常数)token
(测点位号,可用于冗余映射)lb
/ ub
(上下界,可为空,由 _generate_lb_ub()
补默认值)如果你的外部系统导出的是另一套 TAGS 文件格式(例如
hndata_4.csv
),TagsData.trans()
会把它转换成上述“目标列结构”(self.target
),然后save()
成你要的输入 CSV(见文件尾部__main__
演示)。
Unknown
:create_var()
里尝试读取 <proj>_u.csv
(或 bundle/<proj>_u.csv
),形成 UnmeasuredData.data
,其中可含列 ivalue
作为迭代初值(UnmeasuredData.init_val
)。Equation
:create_var()
里读取 <proj>_f.csv
,形成方程清单(方程名、类型、描述)。真正的残差计算由你在 hope.py
里写的 f_val(x,u)
负责。MeasureData.convert()
对每一行测点,做这些事情:
生成/补上下界
lb/ub
缺失,就按变量类型用 _generate_lb_ub()
给默认范围(例如压强/温度/流量/功率的下界设 0;干度上界 1)。unit_convert(reverse=True)
把范围值从标准单位换回原始单位再写回 DataFrame(保证上下界与 value
同单位体系)。OFF 测点
value == 'OFF'
或 accuracy == 'OFF'
,则将该点标记到 self.off
,并把 val[i]=0.0
、intv[i]=-1.0
(后续会裁剪掉)。越界检查与约化
value
超过 lb/ub
,先告警。dvalue
(设计值)缺失,则把 value
夹到边界;否则直接替换为设计值。单位转换(核心)
unit_convert(value, vartype, vunit, ...)
把原始单位的测量值统一转换到标准单位:
atmos
。kJ/kmol
的转化)。type == 'atm'
,读到它后会更新数据集的大气压 self.atmos
,供压力表压转换使用。不确定度生成(转成 1.96 置信区间的半宽)
aunit == '%'
,按相对不确定度计算:intv = value * accuracy%
,并在 value==0 时给一个极小值防止协方差为 0。accuracy == 'CST'
,设一个极小不确定度 1e-5
。intv = float(accuracy)
。accuracy
缺失,_autoacc()
会根据类型给默认值并记录日志(如流量 5%、压力 2%、温度 ±2℃ …)。最后:
self.val
、self.intv
这两个 numpy 数组就准备好了(全是标准单位)。Measured.init_md()
会把它们送进 Measured
:
self.S = diag((intv/1.96)^2)
作为协方差;Partialer
记录 OFF
/CST
等不可调整的测点索引;n2i
索引(冗余测点同名处理也在这里)。顺带:
Elements
可被MeasureData.load_elems()
传入用来自动补全description
,并解析测点名里的设备/端口/参数,提升可读性,但不影响数值本身。
hope.py
就是这种):
Unknown.construct(bal_func)
绑定 u = bal_func(x)
(你的 u_val(x)
)。Measured.evaluate()
里更新 x
后,u.solve(self)
就会用 u_val(x)
重新“把未知量都算出来”,并用自动微分拿到 ∂u/∂x
与 S_u = J_u S_x J_u^T
。Equation.mapping(x,u)
用 f_val(x,u)
计算残差 f.val
、雅可比 ∂f/∂x (及 ∂f/∂u)
与 S_f = J_f S_x J_f^T
(链接模式里 ∂f/∂x
会自动链式包含 ∂u/∂x
)。Measured.evaluate(f, u, link_mode=True)
中进行迭代:
v
使残差加权最小(VDI-2048/卡方意义);x = x_old - v
;链接模式下再计算新的 u = u(x)
;映射新方程;eps
或到达 max_iter
;eq_err
(残差均方差)、total_penalty
(卡方统计量)等。Equation.verify(x, u, chi2=...)
计算临界值 x.K_crt
:
c2.ppf(0.95, m-p)
;无未知量 → fd.ppf
或 c2.ppf
。quality = total_penalty / K_crt
;quality < 1
认为整体无粗大误差。Measured.locate()
/ Equation.locate()
定位疑似粗大误差点;rectify()
应用修正;并再次 mapping()
、写入新旧对比。Solver.save_file()
:
pythonself.data_file.convert_unit() # 把标准单位结果按需求(逆)转换为报表/原始单位 self.data_file.save() # 写 JSON(完整的 x/u/f、统计量、曲线等) self.data_file.save_csv() # 写 CSV(表格友好)
result_<输入文件名去后缀>
;或者使用 --output
指定目录。x
(已知测点修正后值、最终不确定度、卡方分量等)u
(未知量求解结果及不确定度)f
(方程残差、关闭项)注:虽然
DataFile
的实现你没贴,但从调用看,它会把数值统一回报告所需的单位(用到unit_convert(..., reverse=True)
这种逻辑),再保存。
pythonself.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse()
bundle
里的 .js/.css
静态资源生成交互式 HTML 报告(图表、表格、误差曲线、可疑点标注等)。Solver.__init__
里配置了 self.log_path = <output>/<proj>.log
,所有 INFO/WARNING/ERROR 都会记录,便于复现与排错。Unknown.solve()
有这一段:
也就是说,u 的结果也会回写到pythonif isinstance(self.var, Data): if self.new: self.var.data = self.var.data.assign(value=self.val, accuracy=self.intv) self.new = False else: self.var.data['value'] = self.val self.var.data['accuracy'] = self.intv
UnmeasuredData.data
,供后续导出/报告用。输入测点基本信息在哪里体现?
在 --input
指定的 CSV 文件(或由 TagsData.trans()
生成的目标 CSV)里。MeasureData.load()/convert()
把它们变成标准单位的 self.val
/ self.intv
与协方差 self.S
,并记录 OFF/CST/上下界等属性;Elements
仅用于文字描述解析。
计算完成后输出到哪里?
Solver.save_file()
将协调后的 x/u/f
及统计结果保存到 result 目录(JSON + CSV),同时生成 HTML 报告,并在 <proj>.log
中记录详细过程。链接模式下 u
的最终值也会回写到 UnmeasuredData.data
以供导出。
如果你给我一小段真实的 CO2-SS.csv
(头 5–10 行即可),我可以把一次完整的单位转换与不确定度生成过程跑一遍,直观看到“原始列 → 标准单位的 val/intv/S → 输出回报告单位”的样子。
太好了~我把你刚上传的两份输入表看了(我在上面给你各展示了前几行预览)。下面先把“表头的英文单词”逐个解释,然后对比 CO2-SS_x.csv 和 CO2-SS_xE1.csv 两份输入的区别。
(这两份文件的列完全一致)
C_O_T1
, T_I_P
)。名称里常见后缀的含义:
_P
压力(Pressure)_T
温度(Temperature)_M
质量流量(Mass flow)_H
比焓(Enthalpy)_LOAD
负荷/功率(Load/Power)_DP
压差(Delta Pressure)_DT
温差(Delta Temperature)_TTD
上端差、_DCA
下端差(换热器常用)Elements.var_dict
/ TagsData.vtype()
里都有映射。p
压力、t
温度、qm
质量流量、h
比焓、load/pw
负荷/功率、dp
压差、dt
温差、x
干度、a
面积、num
无量纲等。MeasureData.convert()
/ _autoacc()
)。Elements
,会自动补更易读的中文说明)。MPa
, kPa
, K
, C
, kg/s
, kW
, …)。aunit='%'
表示相对不确定度;CST
表示常数;OFF
表示该测点关闭)。%
表示相对不确定度(按 value×百分比计算)。C
表示温度的 ±℃。我做了逐格对比(同一行同一列对比),总结如下:
value
列):例如
C_O_T1
(压气机出口温度测点)在一份里是 229.2/229.2,另一份是 229.4/229.0;C_I_M
(压气机入口质量流)0.4042 改为 0.4062 或 0.4022;FUEL_M
(燃料流)0.0086 改为 0.0084 或 0.0088;T_I_T
(涡轮进口温度)1067.0 改为 1067.5/1066.6;T_I_P
(涡轮进口压力)0.457 改为 0.459/0.455;T_O_T1
(涡轮出口温度测点)714.3 改为 714.6/714.0;T_I_M
(涡轮进口流量)0.413 改为 0.418/0.408。CO2-SS_x.csv
的某行 name
是 # C_O_P
(前面有 #
,像是人为注释),而在 CO2-SS_xE1.csv
则是 C_O_P
(去掉了 #
),且该行 value
也略有不同(0.4661→0.4681)。C_O_P
的 accuracy
在一份是 '5'
,另一份是 '2'
。以上差异我已经逐项比对,前 5 条差异如:
- 第 4 行(从 0 开始计数):
name
从# C_O_P
→C_O_P
,value
0.4661 → 0.4681- 第 5 行:同名
C_O_P
,value
0.4661 → 0.4641,accuracy
5 → 2- 第 6/7 行:
C_O_T1
的value
小幅调整
…(后续多处是 value 的细微变化)
CO2-SS_x.csv
可以看作基准数据;CO2-SS_xE1.csv
是“实验/工况 E1”的一套观测值(多数值略有改动,且至少一个测点的精度提升:accuracy 5% → 2% 或绝对值 5 → 2)。MeasureData.load()
把上面这些列读入为 DataFrame;MeasureData.convert()
会
type
与 vunit
);accuracy
+aunit
);val
和 intv
,并据此生成协方差矩阵 S
。Measured.evaluate()
的迭代里,这些 val/S
与你写的方程 f_val(x,u)
一起进入“最小化残差”的优化过程;accuracy
越小的点权重越大。如果你愿意,我可以把两份文件的差异清单导出成一个小表(哪一行、哪个字段不同、旧值→新值),放到同一目录,便于你审阅和归档。需要的话我现在就生成。
def n2x(self):
return self.x.n2i
其中,n2i的定义在哪里?
你问到的 n2i
,其实就是在 Measured
类里 定义的。来看一下流程:
n2x
的来源在 hope.py
里有:
pythonn2x = solver.n2x
而 Solver
类里定义了:
pythondef n2x(self): return self.x.n2i
这里的 self.x
是 Measured
类型的对象(在 Solver.create_var()
里构造)。
n2i
的定义位置在 Measured
类(定义在 solver.py
里)中有个方法 init_md()
:
pythondef init_md(self): md = self.md self.n = len(md) self.val, self.intv = md.val, md.intv self.names = md.name # 这里定义了 n2i: self.n2i = {md.name[i]: i for i in range(len(md))}
也就是说:
n2i
是一个字典,键是测点名称(md.name[i]
),值是它在数组里的索引位置(i
)。val
、intv
数组里的下标。例如:
pythonn2x = solver.n2x print(n2x['C_O_T1'])
可能会输出 6
,表示 C_O_T1
这个测点在内部数组的第 6 个位置。
✅ 总结:
n2x()
方法其实就是暴露了 Measured.n2i
。n2i
在 Measured.init_md()
中定义,是 测点名 → 索引 的映射字典。val
、intv
数组中的对应下标。要不要我帮你用你上传的 CO2-SS_x.csv
文件跑一遍,把 n2i
生成出来,列出 测点名和它对应的数组索引?这样你就能直观地看到字典内容了。
textif args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv') init_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output) n2x = solver.n2x # 获取索引变量 n2u = solver.n2u # 创建辅助变量集 X = VariableManager(n2x, n2u)
辅助变量集是用来干什么的?
好问题 👍
你这里的 辅助变量集 X = VariableManager(n2x, n2u)
,其实就是一个“桥梁工具”,让后续写热平衡函数 u_val(x)
和方程函数 f_val(x,u)
更直观、更方便。
n2x['C_I_T'] = 3
表示 “压气机入口温度” 在数组 x
的第 3 个位置。x[3]
, x[7]
,非常不直观。VariableManager(n2x, n2u)
接收两个映射:
x
中的索引u
中的索引然后它提供一种更友好的写法:
pythonX.init(x, u) X['C_I_T'] # 自动返回数组中“C_I_T”位置的值 X['C_O_H'] # 自动返回压气机出口焓 X['GEN_LOAD'] # 自动返回发电机负荷
同时它也支持赋值:
pythonX['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF'])
这行的意义是:计算压气机出口焓,并直接写回到 x
数组对应位置。
X['C_I_T']
一看就是压气机入口温度),又不容易写错。如果没有 VariableManager
,你可能要这样写:
pythonC_I_H = air_h_tp(x[n2x['C_I_T']], x[n2x['C_I_P']])
用了 VariableManager
之后,只需要:
pythonX['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P'])
看起来就像公式推导一样。
✅ 总结一句话:
辅助变量集 X
就是一个变量管理器,用来把底层的数组存取操作封装成“用名字访问变量”。它让热力学平衡函数和方程函数的编写更直观,避免频繁查索引。
要不要我帮你画一个简单的示意图,展示 “x 数组 ←→ n2x ←→ VariableManager ←→ X['C_I_T']” 的关系?这样你能一眼看出这个管理器在中间扮演的角色。
需要你帮我画一个简单的示意图,展示他们的关系
pythonimport matplotlib.pyplot as plt # 画一个简单的示意图展示关系 fig, ax = plt.subplots(figsize=(8, 6)) # 定义节点 nodes = { "CSV 文件\n(CO2-SS_x.csv)": (0, 2), "MeasureData\n(读取CSV)": (3, 2), "Measured\n(对象self.x)": (6, 2), "n2i\n{name→index}": (9, 2), "Solver.n2x()": (12, 2), "VariableManager\n(X)": (6, 0), "访问示例:\nX['C_I_T'] ↔ x[n2i['C_I_T']]": (6, -2) } # 画节点 for name, (x, y) in nodes.items(): ax.text(x, y, name, ha="center", va="center", bbox=dict(boxstyle="round,pad=0.4", fc="lightblue", ec="black", lw=1)) # 画箭头 arrows = [ ("CSV 文件\n(CO2-SS_x.csv)", "MeasureData\n(读取CSV)"), ("MeasureData\n(读取CSV)", "Measured\n(对象self.x)"), ("Measured\n(对象self.x)", "n2i\n{name→index}"), ("n2i\n{name→index}", "Solver.n2x()"), ("Solver.n2x()", "VariableManager\n(X)"), ("VariableManager\n(X)", "访问示例:\nX['C_I_T'] ↔ x[n2i['C_I_T']]") ] for start, end in arrows: x1, y1 = nodes[start] x2, y2 = nodes[end] ax.annotate("", xy=(x2, y2+0.3 if y2<y1 else y2-0.3), xytext=(x1, y1-0.3 if y2<y1 else y1+0.3), arrowprops=dict(arrowstyle="->", lw=1.5)) ax.set_xlim(-1, 14) ax.set_ylim(-3, 3.5) ax.axis("off") plt.show()
这里给你画了一张简图,展示了输入数据(CSV 文件)如何一步步变成你可以用 X['C_I_T']
直接访问的“辅助变量集”:
CO2-SS_x.csv
)也就是说:VariableManager 就是封装了 n2i 的接口,让你能用名字而不是数字下标去访问/写入数组中的值。
要不要我再帮你把 f_val 和 u_val 的计算过程拿出来,用这张图里的数据流解释它们是怎么依赖 X
的?
类似于X['C_I_T']这样的表达,返回的是C_I_T的value还是C_I_T所在的位置?如果返回的是value,是在哪部分代码中存在读取对应csv文件的value值的?
好问题 👍,我来帮你把这个链条捋清楚。
X['C_I_T']
返回什么?C_I_T
的数值 (value),而不是位置索引。位置索引
的信息在 n2x
/ n2u
字典里已经有了,但 VariableManager
的设计就是为了屏蔽索引,让你用名字直接拿到值。value
是从哪里来的?流程是这样的:
CSV 输入文件(例如 CO2-SS_x.csv
)里有:
name
(比如 C_I_T
)value
(原始数值,比如 298.0)vunit
(单位,比如 K
)accuracy
(不确定度)等等。MeasureData.load()
pandas.DataFrame
。MeasureData.convert()
value
按照 vunit
转换到标准单位(例如温度统一成 K,压力统一成 MPa)。self.val
(测点值)和 self.intv
(不确定度)。Measured.init_md()
MeasureData
里取出 val
、intv
和 name
。n2i = {name: i}
,表示每个测点对应的数组下标。VariableManager.init(x, u)
x
(已知量数组)、u
(未知量数组)。X['C_I_T']
,它会:
n2x['C_I_T']
找到下标 i
;x[i]
拿值并返回。假设 CSV 里有一行:
name | type | value | vunit | accuracy |
---|---|---|---|---|
C_I_T | t | 25.0 | C | 2.0 |
流程:
MeasureData.convert()
把 25.0 ℃ 转换成 298.15 K,放到 val
数组。val = [ ..., 298.15, ... ]
accuracy
转换成 ±2K,放到 intv
。Measured.init_md()
把 C_I_T
映射到下标 3:n2i['C_I_T'] = 3
。VariableManager.init(x)
接收这个数组。X['C_I_T']
:x[3] = 298.15
。所以结果就是 值(value,经过单位转换后的标准单位)。
✅ 总结一句话:
X['C_I_T']
返回的是“压气机入口温度的数值(已换算成标准单位)”,这些数值最初来自输入 CSV 的 value
列,在 MeasureData.convert()
里完成单位换算和校验。
要不要我帮你用你上传的 CO2-SS_x.csv
实际跑一遍,把几个关键测点(比如 C_I_T
, C_O_P
, T_I_T
)的 CSV 原始值 → 转换后内部值 → X[...]
访问结果 列个表出来?这样你会更直观地看到全过程。
Measured.init_md()
把 C_I_T
映射到下标 3:n2i['C_I_T'] = 3
。啊,这里有点容易搞混 🤓。我来仔细解释一下为什么是 3 而不是 “第 2 列”。
n2i
的意义在 Measured.init_md()
里:
pythonself.n2i = {md.name[i]: i for i in range(len(md))}
i
是 行号(索引),来自 MeasureData
里 md.name
的列表。len(md)
表示测点的总数(行数),所以 i
从 0 开始 递增。举例:如果 CSV 文件里(忽略掉表头)前三行的 name 分别是:
C_I_T, C_I_P, C_O_T
那么生成的映射就是:
pythonn2i['C_I_T'] = 0 n2i['C_I_P'] = 1 n2i['C_O_T'] = 2
如果 第四行才是 C_O_P
,那么:
pythonn2i['C_O_P'] = 3
name, type, description, value, vunit, …
。n2i
并不是对 “列” 编号,而是对 行 编号。n2i
关心的是“第几个测点”,而不是“在 CSV 的第几列”。C_I_T
的下标是 3,是因为它在 CSV 的 第 4 行(Python 从 0 数)。value
列只是这一行里的一个属性,真正的数值会被 MeasureData.convert()
取出来放进数组 val
。VariableManager
用 n2i
找到下标,再到 val
里取数。✅ 换句话说:
n2i['C_I_T'] = 3
的意思是 “C_I_T 这个测点在所有测点数组里的第 4 个位置”,不是“第 2 列”。
要不要我帮你直接用你上传的 CO2-SS_x.csv 文件跑一下,把前 10 个测点的 name
和它在 n2i
里的索引打印出来?这样你能一眼看到 “第几行 → 下标几”。