gpt-5与gpt-o3是什么区别呢?哪种更优秀

Creato il: 18 agosto 2025

Risposto usando GPT-5 da Chat01

Domanda

gpt-5与gpt-o3是什么区别呢?哪种更优秀

Pensiero

Thought for 57s

Risposta

简短结论:
**o3 属于上一代“推理”模型,擅长长链推理与多模态理解;GPT-5 是新一代统一系统(含 GPT-5 Thinking),总体更强,默认更快更实用,并在需要时自动“深度思考”。**在大多数任务里,优先选 GPT-5;只有当你固定依赖旧的 o3 工作流或权限限制时,才考虑继续用 o3。(OpenAI, OpenAI CDN)

主要区别(一眼看懂)

维度GPT-5OpenAI o3
模型定位统一系统:快速主模型 + 深度推理模型(GPT-5 Thinking)+ 实时路由根据任务自动选择专注“长时思考”的推理模型
强项编码与多工具编排、指令跟随、通用问答;在需要时启用深度推理;更低幻觉率与更诚实数学/科学/编码/视觉推理强,能把图表、截图等纳入推理
使用体验默认更快更实用;可在 API 里控制“最小推理(minimal reasoning)”与冗长度(verbosity)以“推理努力”换效果,通常时延更高
状态新一代旗舰,已在 ChatGPT 默认替代 o3;亦提供 GPT-5 Thinking / Pro上代推理线,官方文档标注已被 GPT-5 继任
(OpenAI CDN) (OpenAI) (OpenAI) (OpenAI) (OpenAI Platform)

哪个更“优秀”?

  • **总体:GPT-5 更胜一筹。**官方评估显示,GPT-5(含 Thinking)在复杂任务、事实准确性与“更诚实地承认做不到”方面都优于 o3;并通过路由在不需深度推理时保持高效。(OpenAI, OpenAI CDN)
  • 如果你是开发者/团队: GPT-5 在编码与“代理式”多工具链调用上是当前最强,并新增了 minimal reasoningverbosity 控制项,便于按需权衡质量/时延/成本。(OpenAI)
  • 如果你专做硬核推理(数学/科研/图表理解): 优先用 GPT-5 Thinking(或有权限时用 GPT-5 Pro);没有权限时,o3 也能胜任但通常不及 GPT-5 家族。(OpenAI)

需要的话,告诉我你的具体场景(写作、编码、做研究、做报表等),我直接给你“选型+提示词”搭配。

Domanda

'''from shutil import copy
import re
import sys
import argparse
from heat_rect import *
from load_data import Elements
from reprocess import DataFile
from reprocess import HtmlReport
from excel_loader import data_template

def conv_path(path):
"""
检测当前路径中是否包含路径分隔符,若不包含则是单个文件名。
"""
return path.rfind(os.path.sep) != -1

def to_list(elems):
"""
用于将单个元素(包括 None)统一转换为列表形式。
"""
if elems is None:
return []
elif type(elems) is str:
return [elems]
else:
return elems

def parse_args(proj_name, title, template=True):
"""
解析命令行参数,返回一个包含参数的字典。
"""
parser = argparse.ArgumentParser(description='数据协调计算程序')
parser.add_argument('--proj', type=str, required=False, default=proj_name, help='项目名称')
parser.add_argument('--input', type=str, required=False, default=f'{proj_name}_x.csv', help='输入数据名称')
parser.add_argument('--desc', type=str, required=False, default=title, help='项目描述')
parser.add_argument('--output', type=str, required=False, default=None, help='结果输出路径')
parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径')
args = parser.parse_args()

text
# 转换数据文件模式 if template is True and args.trans is not None: data_template(args.trans, template=f'{proj_name}_data_template.csv') return args

class Solver:
"""
求解器类,用于组织各个变量对象,并以此为基础进行变量构建、协调、粗大误差识别、修正、比较等功能。
"""
def init(self, proj_name=None, input_name=None, desc=None, timer=None, link=True, output=None):

text
# 设置默认大气压 self.atmos = 0.101325 # 附属变量对象 self.x = None self.f = None self.u = None # 附属函数对象 self.bal_func = None # 热平衡函数: x -> u self.dvr_func = None # 数据协调函数: x, u -> f self.drop_func = None # 关闭冗余平衡方程函数:x, u -> List if proj_name is None: proj_name = 'default' self.proj_name = proj_name # 输入与输出路径处理 # 输入路径预处理 if input_name is not None: # 自定义输入文件 self.input_name = input_name.replace('/', os.path.sep) # 统一路径分隔符 if not conv_path(self.input_name): # 路径中不包含分隔符,单文件名 if not os.path.exists(self.input_name): # 当前目录下寻找 if os.path.exists('data'): # 否则在 data 目录下寻找 self.input_name = os.path.join('data', self.input_name) # 检查文件是否存在 if not os.path.exists(self.input_name): raise FileNotFoundError('未找到 input 参数所对应的数据文件,请检查后重试!') else: raise ValueError('未提供 input 参数,请指定数据文件后再进行计算!') # 输出路径生成 # 输出路径或为当前工作目录,或来自输入文件,不需要检查存在性 result_dir = 'result' self.output = output if self.output is None: # 未提供输出路径时 if self.input_name is not None: # 若提供了输入文件路径,则设置为对应目录 self.output = os.path.abspath(os.path.join(self.input_name, os.pardir)) input_folder = re.match(r'(.*\\)?(.*)\.csv', self.input_name) result_dir += '_' + input_folder.group(2) self.output = os.path.join(self.output, result_dir) else: # 否则直接在当前工作路径下创建输出文件夹 self.output = result_dir if not os.path.exists(self.output): os.mkdir(self.output) # 其他变量生成 self.report = None if desc is None: desc = '数据协调计算报告' self.desc = desc self.counter = 0 # 共执行计算的次数 self.data_file = None # 数据文件 self.link_mode = link # 是否启用链式计算模式 # 计时器 if timer is None: self.timer = datetime.now() else: self.timer = timer # 日志配置 root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') file_handler = logging.FileHandler(self.log_path, mode='w', encoding='utf-8') file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) std_handler = logging.StreamHandler() std_handler.setFormatter(formatter) root_logger.addHandler(std_handler) logging.info('欢迎使用 heat_rect 数据协调计算程序!项目编号:{},项目标题:{}'.format( self.proj_name, self.desc)) # 创建基本对象 logging.info('开始读取变量数据……') self.create_var() logging.info('开始创建及维护计算文件……') self.create_file() self.data_file.cache('atmos', self.atmos) # 记录大气压 logging.info('变量及文件路径初始化完成。(elapsed time: {:d}s)'.format(self.time)) logging.info('当前程序输出路径为:[{}]'.format(os.path.abspath(self.output))) @property def n2x(self): return self.x.n2i @property def n2u(self): return self.u.n2i @property def n2f(self): return self.f.n2i @property def time(self): return elapsed_time(self.timer) @property def data_path(self): if os.path.isabs(self.data_file.path): # 绝对路径直接返回 return self.data_file.path else: return os.path.abspath(os.path.join(self.output, self.data_file.path)) @property def report_path(self): if os.path.isabs(self.report.path): # 绝对路径直接返回 return self.report.path else: return os.path.abspath(os.path.join(self.output, self.report.path)) @property def log_path(self): return os.path.abspath(os.path.join(self.output, self.proj_name + '.log')) @property def root_path(self): # 定位程序根目录 if getattr(sys, 'frozen', False): # 打包后程序 return sys._MEIPASS else: # 脚本状态 return os.path.join(os.path.realpath(__file__), os.path.pardir) def set_atmos(self, atmos): self.atmos = atmos def create_file(self): """ 创建数据文件与报告文件。在需要进行松弛计算时,可多次调用。 """ filename = self.proj_name if self.counter > 0: # 多次执行 filename += '-' + str(self.counter) self.data_file = DataFile(filename + '.json', root=self.output) # 创建结果数据文件 self.report = HtmlReport(filename, root=self.output) # 创建报告 # 从程序目录复制静态文件 static_path = os.path.abspath(os.path.join(self.root_path, 'bundle')) for static_file in os.listdir(static_path): if static_file.endswith('.js') or static_file.endswith('.css'): if not os.path.exists(os.path.join(self.output, static_file)): # 仅在未复制时创建 copy(os.path.join(static_path, static_file), self.output) self.report.create(title=self.desc, author='中核运行研究院') def create_var(self): """ 创建变量对象。该函数仅在创建求解器时调用一次即可。 """ # 初始化设备描述文件 try: elems = Elements(self.proj_name) except: logging.warning('未检测到设备描述文件,不再进行设备描述生成。') elems = None # 初始化已知测点 md_x = MeasureData(self.input_name) if elems is not None: md_x.load_elems(elems) md_x.load() self.x = Measured(md_x) self.x.init_md() self.set_atmos(self.x.var.atmos) # 更新大气压 logging.info('测点变量定义读取完成。(elapsed time: {:d}s)'.format(self.time)) # 初始化未知测点 try: md_u = UnmeasuredData(os.path.join(self.root_path, 'bundle', self.proj_name + '_u.csv')) if elems is not None: md_u.load_elems(elems) md_u.load() except: md_u = UnmeasuredData(self.proj_name + '_u.csv') md_u.load() self.u = Unknown(md_u) self.u.init_md() # 初始化冗余平衡方程 try: md_f = Data(os.path.join(self.root_path, 'bundle', self.proj_name + '_f.csv')) md_f.load() except: md_f = Data(self.proj_name + '_f.csv') md_f.load() self.f = Equation(md_f) self.f.init_md() def create_func(self, bal_func, dvr_func, drop_func=None): """ 创建热平衡与数据协调函数变量。该函数仅在创建求解器后调用一次即可。 :param bal_func: 热平衡函数变量。 :param dvr_func: 数据协调函数变量。 :param drop_func: 移除方程函数。 """ if bal_func is None: assert self.link_mode is False # 必须是非链接模式 self.bal_func = None else: self.bal_func = bal_func self.dvr_func = dvr_func if drop_func is not None: self.drop_func = drop_func def drop_equations(self, names): """ 从平衡方程中排除指定方程。 :param names: 方程索引列表。 """ assert self.f is None # TODO: 利用 Partialer 类完成本函数编写 def init(self): """ 初始化各变量值。该函数仅在计算最开始调用一次。 """ # 读取未知测点 if self.link_mode: self.u.construct(self.bal_func) elif self.bal_func is not None: self.u.construct(self.bal_func) logging.info('未知测点变量读取完成。(elapsed time: {:d}s)'.format(self.time)) # 读取平衡方程 self.f.construct(self.dvr_func) if self.drop_func is not None: self.f.bind_drop(self.drop_func) self.f.patch_redundancy(self.x) logging.info('冗余平衡方程读取完成。(elapsed time: {:d}s)'.format(self.time)) def solve(self): """ 执行求解过程。该函数可在松弛计算过程中多次调用。 """ # self.x.show() self.data_file.cache_var('x', self.x, md=True) logging.info('开始求解数据协调优化问题……(elapsed time: {:d}s)'.format(self.time)) if self.link_mode or self.bal_func is not None: self.u.solve(self.x) else: self.u.init_static() self.data_file.cache_var('u', self.u, simple=True, md=True) self.data_file.cache({'u': {'vunit': self.u.var.data['vunit']}}) # 未知变量单位补丁 logging.info('已通过热平衡计算获得未知测点初始假设值。(elapsed time: {:d}s)'.format(self.time)) self.f.mapping(self.x, self.u, link_mode=self.link_mode) self.data_file.cache_var('f', self.f, simple=True, md=True) logging.info('冗余平衡方程构建完成。(elapsed time: {:d}s)'.format(self.time)) init_err = mse(self.f.val) logging.info('当前测点下冗余平衡方程残差均方差为:{:.2E}'.format(init_err)) self.data_file.cache({'init_err': init_err}) self.x.evaluate(self.f, u=self.u, link_mode=self.link_mode) # 求解协调优化问题 def verify(self): """ 执行粗大误差验证过程,若不存在则将自动执行修正计算,否则进行粗大误差定位。可以多次调用。 :return: 当前数据中是否存在粗大误差,布尔值。 """ quality = self.f.verify(self.x, u=None if self.link_mode else self.u, chi2=True) flag = quality < 1 # 是否存在粗大误差 if flag: logging.info('当前数据卡方值为 {:.3f},小于临界值 {:.3f},品质因数为 {:.3f}, 系统整体不存在粗大误差。'.format( self.x.total_penalty, self.x.K_crt, quality)) else: logging.warning('当前数据卡方值为 {:.3f},大于临界值 {:.3f},品质因数为 {:.3f},系统整体存在粗大误差!'.format( self.x.total_penalty, self.x.K_crt, quality)) # 误差定位 logging.info('开始进行误差定位!(elapsed time: {:d}s)'.format(self.time)) # 约束方程检验法 index_f = self.f.locate() self.data_file.cache({'f': {'r': self.f.r}}) self.data_file.cache(index_f, 'f_index') # 测量数据检验法 index = self.x.locate() self.data_file.cache({'x': {'delta': self.x.v, 'r': self.x.r}}) self.data_file.cache(index, 'index') # 计算修正结果 logging.info('开始实施数据协调校正!(elapsed time: {:d}s)'.format(self.time)) self.x.rectify() # 修正已知测点变量值 if self.link_mode: self.u.solve(self.x) # 计算未知测点变量值 else: self.u.calc_static() self.x.patch_unknown(self.u) self.f.mapping(self.x, self.u, link_mode=self.link_mode) # 计算平衡方程剩余残差 self.data_file.cache_new(self.x, self.f, u=self.u, link_mode=self.link_mode) final_err = mse(self.f.residual()) self.data_file.cache({'final_err': final_err}) # 保存数据 self.data_file.cache(self.x.total_penalty, 'total_penalty') # 总卡方值 self.data_file.cache({'x': {'penalty': self.x.penalty}}) # 各变量卡方值 self.data_file.cache({ 'eq_err': self.x.eq_err, 'chis': self.x.chis }) # 方程均方差与总卡方值曲线 self.data_file.cache(self.x.K_crt, 'Kcrt') # 假设检验临界值 return flag def save_file(self): """ 保存数据文件与计算报告。在松弛计算时,可多次调用。 """ # 保存数据文件 logging.info('当前计算步 JSON 数据文件保存至: [{}]'.format(self.data_path)) self.data_file.convert_unit() self.data_file.save() self.data_file.save_csv() # self.data_file.save_balanced_csv() # 仅在为论文调试系统时使用 # 保存网页报告 logging.info('当前计算报告保存至: [{}]'.format(self.report_path)) self.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse() def routine(self): """ 常规求解循环,执行求解和验证环节。 """ self.init() self.solve() self.verify() logging.info('数据协调计算完成。(elapsed time: {:d}s)'.format(self.time)) logging.info('本次计算过程日志保存至: [{}]'.format(self.log_path)) self.save_file()

class VariableManager:
"""
变量管理器,用于统一组织已知及未知变量,简化热平衡与协调方程写法。
"""
def init(self, n2x, n2u):
self.n2x = n2x
self.n2u = n2u
self.flags = self.init_n2i()
self.x = None
self.u = None

text
def __getitem__(self, item): if item in self.n2x: return self.x[self.n2x[item]] elif item in self.n2u: if self.u[self.n2u[item]] == -1: raise ValueError('不能使用未赋值变量 {} 的值!'.format(item)) else: return self.u[self.n2u[item]] else: raise IndexError('索引 {} 不存在于当前管理器对象当中,请检查!'.format(item)) def __setitem__(self, key, value): if key in self.n2x: raise IndexError('不能设置已知变量 {} 的值!'.format(key)) elif key in self.n2u: if self.u[self.n2u[key]] == -1: # 未赋值 self.u[self.n2u[key]] = value else: # 已赋值 raise IndexError('不能重新设置已赋值未知变量 {} 的值!'.format(key)) else: raise IndexError('索引 {} 不存在于当前管理器对象当中,请检查!'.format(key)) def init(self, x, u=None): self.x = x if u is None: self.u = np.ones(len(self.n2u)) * -1 # 全部设置为 -1 else: self.u = u def init_n2i(self): flags = dict() redundant_names = [] for name in self.n2x.keys(): flags[name] = True for name in self.n2u.keys(): if name in flags.keys(): redundant_names.append(name) else: flags[name] = False if len(redundant_names) > 0: raise IndexError('索引 {} 同时出现在已知与未知变量当中,请检查!'.format(redundant_names)) return flags def pt2h(self, name): self[name + '_H'] = h_pt(self[name + '_P'], self[name + '_T']) def check_val(self): for name in self.n2u.keys(): if self.u[self.n2u[name]] == -1: logging.error('索引 {} 变量尚未求值,请检查程序正误!'.format(name)) @property def val(self): return self.u # 预定义方程 def mas_bal(self, inlets, outlets): res = 0.0 for inlet in to_list(inlets): res += self[inlet + '_M'] for outlet in to_list(outlets): res -= self[outlet + '_M'] return res def en_bal(self, inlets, outlets, inputs=None, outputs=None): res = 0.0 for inlet in to_list(inlets): res += self[inlet + '_M'] * self[inlet + '_H'] for input in to_list(inputs): res += self[input] for outlet in to_list(outlets): res -= self[outlet + '_M'] * self[outlet + '_H'] for output in to_list(outputs): res -= self[output] return res / 1e6 def p_bal(self, inlet, outlet, dp=None): res = self[inlet + '_P'] - self[outlet + '_P'] if dp is not None: res -= self[dp] return res def h_eq(self, stream, mode='pt'): res = 0.0 if mode == 'pt': res = self[stream + '_H'] - h_pt(self[stream + '_P'], self[stream + '_T']) elif mode == 'px': res = self[stream + '_H'] - h_px(self[stream + '_P'], self[stream + '_X']) elif mode == 'tx': res = self[stream + '_H'] - h_tx(self[stream + '_T'], self[stream + '_X']) elif mode == 'p0': res = self[stream + '_H'] - h_px(self[stream + '_P'], 0) elif mode == 't0': res = self[stream + '_H'] - h_tx(self[stream + '_T'], 0) elif mode == 'p1': res = self[stream + '_H'] - h_px(self[stream + '_P'], 1) elif mode == 't1': res = self[stream + '_H'] - h_tx(self[stream + '_T'], 1) else: raise ValueError(f'无法识别计算模式 {mode},请检查!') return res / 1e3 def lv_bal(self, stream): return self[stream + '_T'] - t_sat(self[stream + '_P'])

class EquationManager:
"""
方程管理器类,用于管理方程残差对象。
"""
def init(self):
self.n2f = None
self.f = None
self._offs = None

text
def init(self, f): self.n2f = f.n2i self.f = np.zeros(len(f)) self._offs = [] def drop(self, index): if self._offs is None: raise ValueError('方程管理器未初始化,请检查!') if type(index) == str: if index not in self.n2f: raise IndexError('方程 {} 未定义,请检查!'.format(index)) self._offs.append(self.n2f[index]) else: self._offs.append(index) def __getitem__(self, item): if item in self.n2f: return self.f[self.n2f[item]] else: raise IndexError('方程 {} 未定义,请检查!'.format(item)) def __setitem__(self, key, value): if key in self.n2f: self.f[self.n2f[key]] = value else: raise IndexError('方程 {} 未定义,请检查!'.format(key)) @property def val(self): return np.array(self.f) @property def off(self): if self._offs is None: raise IndexError('方程管理器未初始化,请检查!') else: return self._offs'''

请逐行解释上述代码帮助我理解它的含义。

Pensiero

Explaining code in sections

The user posted a large code block and asked for a line-by-line explanation. Since the code is already in Chinese, I'll respond in Chinese too. It’s a long code (500 lines), so I’ll focus on key sections and provide a condensed explanation, grouping lines together where possible to keep things concise but clear. I'll avoid asking for clarification and provide the best explanation I can, even if it’s partial.

Explaining code in detail

The user wants a line-by-line explanation. Since the code doesn’t have line numbers, I’ll refer to sections by function or method. I’ll cover each major section like imports, helpers, classes, and methods, but I’ll avoid verbosity that exceeds token limits. I could condense explanations with short notes, like “from shutil import copy: 从shutil导入copy函数”. Each explanation will be clear but concise.

Pointing out issues and dependencies

I should keep it clear and concise, avoiding too much detail. There are some potential problems:

  • Missing imports like os, logging, datetime, and numpy—though they might come from heat_rect, it’s not usual to rely on that without explicit imports. I’ll note this.
  • conv_path uses os.path.sep, but os isn’t imported.
  • parse_args references excel_loader.data_template.

Analyzing code structure

In Solver.__init__, the order looks good. self.prof_name sets the project name, and self.input_name and self.output tie into logging early enough. However, self.report and self.data_file aren’t created yet, which is fine for log_path.

For output path generation, there's a bug with os.path.join, as the logic assumes backslashes. os.path.dirname would be a safer option.

Reviewing solver structure

The code handles data reconciliation for heat balance, but I noticed some potential issues. For example, the regex uses Windows-style path separators, which wouldn't work on POSIX systems. Also, the logic for drop_equations seems flawed as it asserts self.f is None, which might be a bug. Additionally, there's improper handling in input path normalization.

Thought for 1m 28s

Risposta

下面按从上到下的顺序,逐行/逐段解释你的代码在做什么,以及它背后的意图与注意点。为避免刷屏,我把同一功能块里语义连贯的多行合并讲,但每一句都覆盖到了。


顶部导入

  • from shutil import copy
    只从 shutil 模块里引入文件复制函数 copy,用于稍后把 .js/.css 静态资源拷贝到输出目录。

  • import re, import sys, import argparse
    分别用于正则处理、系统信息(诸如打包路径)、命令行参数解析。

  • from heat_rect import *
    heat_rect 模块里所有公开符号导入当前命名空间——看后文,里面应当包含:数值函数 elapsed_time, mse, h_pt, h_px, h_tx, t_sat、数据结构 Measured, Unknown, Equation, MeasureData, UnmeasuredData, Data,以及可能还转手导入了 numpy as nploggingdatetimeos 等(否则本文件直接使用这些名字会报错)。

  • from load_data import Elements
    读取设备/拓扑的描述类,用于把“点位”同设备结构关联。

  • from reprocess import DataFile / HtmlReport
    数据缓存与生成 HTML 报告的封装。

  • from excel_loader import data_template
    支持把“外部输入文件”转换为本系统所需的模板格式。

⚠️ 注意:本文件直接使用了 os, logging, datetime, np 等名字,但并未在本文件 import;它们应来自 heat_rect 的星号导入,否则会 NameError


小工具函数

  • def conv_path(path): ... return path.rfind(os.path.sep) != -1
    检查字符串里是否出现系统路径分隔符(例如 Windows 的 \ 或 Linux 的 /)。出现则视为“带路径”,否则视为“纯文件名”。

  • def to_list(elems): ...
    把输入统一成列表:None -> []str -> [str],其它(假定已是可迭代)原样返回。后面用于把“单个入口/出口”与“多个入口/出口”的书写统一起来。


命令行参数解析

  • def parse_args(proj_name, title, template=True):
    以“项目名、标题”为默认值构建参数解析器。
    • --proj 项目名称(默认传入的 proj_name
    • --input 输入 CSV 文件名(默认 <proj>_x.csv
    • --desc 报告标题(默认传入的 title
    • --output 输出目录
    • --trans 若提供,把外部文件转换为系统模板
  • 解析后:若 template is True 且用户提供了 --trans,就调用 data_template(args.trans, template=f'{proj_name}_data_template.csv') 生成一份模板 CSV。
  • 返回 args

类:Solver(求解器,负责一次完整的数据协调流程)

初始化

  • __init__(..., proj_name=None, input_name=None, desc=None, timer=None, link=True, output=None)
    设定环境与路径,读取输入,构建日志与核心对象。

    • self.atmos = 0.101325:默认大气压(单位应是 MPa)。
    • 预留“变量对象”与“函数对象”:
      • self.x(已知测点,Measured)/ self.u(未知测点,Unknown)/ self.f(冗余方程,Equation)
      • self.bal_func(热平衡 x→u)、self.dvr_func(协调 x,u→f)、self.drop_func(关闭部分方程)
    • 处理 proj_name 默认值 'default'

    输入路径处理

    • 若提供了 input_name
      1. replace('/', os.path.sep) 把斜杠统一为当前系统的分隔符。
      2. 若不含分隔符:优先在当前目录找;找不到且存在 data/ 目录,则退而到 data/ 下找。
      3. 最终仍不存在:抛 FileNotFoundError
    • 未提供 input_name:抛 ValueError(必须有数据文件)。

    输出路径处理

    • 默认输出目录名是 result
    • 若未显式传入 output 且已有 input_name
      • self.output = abspath(join(self.input_name, os.pardir)) 取输入文件的“上级目录”。

        等效于 dirname 的绕行写法。

      • 用正则 re.match(r'(.*\\)?(.*)\.csv', self.input_name) 提取输入文件的“基名”(不含扩展名),然后把 result_dir 改成 result_<基名>,并拼到输出目录。

        ⚠️ 这里正则只匹配反斜杠 \\ 的 Windows 路径,Linux/Mac 下可能不匹配,建议用 os.path 系列函数替代。

    • 若输出目录不存在:os.mkdir(self.output)(单级目录创建)。

    其余初始化

    • self.report = Noneself.desc = desc or '数据协调计算报告'self.counter = 0(用于多轮松弛时的文件后缀),self.data_file = Noneself.link_mode = link
    • self.timer = timer or datetime.now() 用于统计耗时。
    • 日志:配置根 logger 为 INFO;文件与控制台双路输出。日志文件路径取自 self.log_path(即 <output>/<proj>.log)。随即打印“欢迎使用…”与传入的项目/标题。

    核心对象创建

    • self.create_var():读取 x/u/f 配置与数据,构造 Measured/Unknown/Equation
    • self.create_file():创建用于保存结果/报告的对象,拷贝静态资源,初始化 HTML 报告骨架。
    • self.data_file.cache('atmos', self.atmos) 把大气压写入缓存。
    • 打印耗时与输出目录的绝对路径。

一组只读属性(便捷访问/路径)

  • n2x/n2u/n2f:分别返回 x/u/f 的“名称→索引”映射(用于写方程时按名字索引)。
  • timeelapsed_time(self.timer),从起始计时点到当前的秒数。
  • data_path/report_path:把相对路径转成以输出目录为根的绝对路径,若原本已是绝对路径则直接返回。
  • log_path<output>/<proj>.log 的绝对路径。
  • root_path:程序根目录。若是打包(sys.frozen)环境,用 sys._MEIPASS,否则取当前脚本所在目录。

其余方法

  • set_atmos(atmos):设置大气压。

  • create_file()
    self.counter 决定本轮文件名前缀(第一次为 <proj>,第二次为 <proj>-1…)。

    • 创建 DataFile('<name>.json', root=self.output) 用于存储本轮计算的数据。
    • 创建 HtmlReport('<name>', root=self.output) 用于输出 HTML。
    • <root>/bundle/ 复制 .js/.css 静态资源到 self.output(若尚未存在)。
    • self.report.create(title=self.desc, author='中核运行研究院') 初始化报告。
  • create_var()

    1. 设备元素:尝试 Elements(self.proj_name),失败则记录 warning 并置为 None
    2. 已知测点 x
      • md_x = MeasureData(self.input_name)
      • 如果有设备元素,md_x.load_elems(elems) 做绑定;随后 md_x.load() 读入数据。
      • self.x = Measured(md_x) & self.x.init_md() 完成对象构造。
      • self.set_atmos(self.x.var.atmos) 用 x 中记录的大气压更新默认值。
    3. 未知测点 u
      • 优先从 <root>/bundle/<proj>_u.csv 读;失败则退到当前工作目录 <proj>_u.csv
      • self.u = Unknown(md_u) & self.u.init_md()
    4. 冗余方程 f
      • 同样先读 <root>/bundle/<proj>_f.csv,失败则退本地 <proj>_f.csv
      • self.f = Equation(md_f) & self.f.init_md()
  • create_func(bal_func, dvr_func, drop_func=None)
    绑定“热平衡函数(x→u)”、“数据协调函数(x,u→f)”、“关闭冗余方程函数”。

    • bal_func is None,则断言 self.link_mode is False(链式模式要求有平衡函数)。
    • 设置 self.drop_func(可选)。
  • drop_equations(names)
    计划用于“从平衡方程集合中排除一些方程”。

    目前未实现;而且 assert self.f is None 是明显错误,应该是“已存在 f”才有可能 drop(应为 assert self.f is not None)。函数体里的 TODO 也提示后续将用 Partialer 完成。

  • init()(一次性初始化数值)

    • 对 u:
      • link_mode 为真bal_func 不为空,则 self.u.construct(self.bal_func) 通过热平衡函数生成 u 的初值或结构;否则跳过。
    • 对 f:
      • self.f.construct(self.dvr_func) 以协调函数构造冗余方程系统;
      • 若提供了 drop_funcself.f.bind_drop(self.drop_func)
      • self.f.patch_redundancy(self.x) 用 x 的信息做冗余性补丁(比如删掉多余方程/变量)。
  • solve()(一次求解流程)

    1. 把 x 的原始测量数据缓存(cache_var('x', ..., md=True)),便于报告和回溯。
    2. 计算 u:
      • link_mode 或有 bal_funcself.u.solve(self.x)
      • 否则 self.u.init_static()(把 u 初始化为静态/常数)。
      • 把 u 及其单位缓存(vunit 补丁以保证单位写入到 JSON)。
    3. self.f.mapping(self.x, self.u, link_mode=...) 基于当前 x/u 映射出方程残差向量,并缓存。
    4. init_err = mse(self.f.val) 计算初始残差均方差,写入缓存。
    5. self.x.evaluate(self.f, u=self.u, link_mode=...) 执行“数据协调优化”,本质是根据方程残差对 x 做最小二乘/带权校正。
  • verify()(粗大误差检验与修正,可多次调用

    1. quality = self.f.verify(..., chi2=True):基于卡方检验输出“品质因数”。
      • flag = quality < 1:小于 1 视为整体无粗大误差;否则视为存在粗大误差。对应打印 info 或 warning。
    2. 误差定位:
      • 方程侧:index_f = self.f.locate(),缓存 f 的残差与索引。
      • 测点侧:index = self.x.locate(),缓存 x 的偏差、残差与索引。
    3. 修正计算:
      • self.x.rectify() 对已知测点做协调校正;
      • 重新计算 u:链式→u.solve(self.x),否则 u.calc_static()
      • self.x.patch_unknown(self.u) 把未知的解合并回 x(一些场景下报告展示全量变量);
      • 重新 f.mapping(...),并 cache_new(...) 保存“新旧”数据对比;
      • final_err = mse(self.f.residual()) 记录最终残差;
      • 缓存“总卡方值”“各变量卡方值”“均方差与卡方曲线 eq_err/chis”与“临界值 K_crt”。
    4. 返回 flag(True 表示无粗大误差)。
  • save_file()(把本轮结果落盘)

    • self.data_file.convert_unit() 统一单位显示;
    • self.data_file.save() 保存 JSON;save_csv() 导出 CSV 明细;
    • self.report.generate_report(..., mode='xuf', log_path=...) 生成 HTML 报告;self.report.parse() 做二次解析(比如插图/表格替换)。
  • routine()(“一键式”完整流程)
    依次调用 init() -> solve() -> verify() -> save_file() 并写日志。


类:VariableManager(把“变量名”映射到 x/u 的便捷器)

  • __init__(self, n2x, n2u)
    记录“名称→索引”的映射;self.flags = self.init_n2i() 校验 n2x 与 n2u 的重名冲突;self.x, self.u 初始为 None

  • __getitem__(self, item)
    让你可以用 vm['A_M'] 这样的方式:

    • 若名字在已知 x:返回 x[n2x[item]]
    • 若在未知 u:检查是否已被赋值(-1 表示未赋值),未赋值时报错;
    • 否则抛索引错误。
  • __setitem__(self, key, value)
    仅允许给“未知变量”第一次赋值(把 -1 改为具体数),二次赋值或给“已知变量”赋值都会报错。

    这保证了“求解构造阶段”只写 u,不破坏 x。

  • init(self, x, u=None)
    绑定底层数组。若不传 u,就把 u 初始化为全 -1 的数组(“未赋值”哨兵)。

  • init_n2i(self)
    合并 n2x/n2u 的 key;若重名,收集起来并抛异常(不允许同名变量既属于已知又属于未知)。

  • pt2h(self, name)
    便捷方法:已知某流股的 P、T → 用 h_pt 计算焓,写入 <name>_H

  • check_val(self)
    遍历 u,若还存在 -1,在日志里报错提示“有未知变量尚未求值”。

  • val 属性:返回 u 的底层数组(方便拿到“所有未知量”的向量)。

  • 预定义平衡/约束方程原语:

    • mas_bal(inlets, outlets):质量守恒(入的质量流量之和 − 出的质量流量之和)。
    • en_bal(inlets, outlets, inputs=None, outputs=None):能量守恒(∑入 M·H + 外部功/热输入 − ∑出 M·H − 外部功/热输出),最后除以 1e6(通常把单位从 W 级缩放到 MW 级报告)。
    • p_bal(inlet, outlet, dp=None):压降方程(入口 P − 出口 P − 指定的压降 dp)。
    • h_eq(stream, mode='pt'):焓一致性(已知 H 与状态方程 h_pt/h_px/h_tx 计算的理论 H 之差),不同 mode 选择不同状态关系;最后除以 1e3 做单位缩放。
    • lv_bal(stream):与饱和温度的偏差(T − t_sat(P)),用于判断是否处于两相平衡等。

这一整套接口让你在写“方程构造函数”时可以更像写“物理公式”,而不用手撸索引。


类:EquationManager(管理方程残差向量与“关闭某些方程”)

  • __init__:准备 n2f(名字到索引)、f(残差向量)、_offs(被关闭的方程索引列表)。

  • init(self, f)
    把外部的 f(Equation 对象)接进来:保存其 n2i;新建同长度的零向量作为残差;把 _offs 清空为 []

  • drop(self, index)
    标记一个方程“关闭不用”。既支持传入字符串名字(会查 n2f)也支持直接传索引号。

    只是“标记”,最终由上层在装配/求解时据此跳过。

  • __getitem__/__setitem__
    允许用名字访问或写入残差向量:em['E101'] = 某个残差值

  • val 属性:返回残差向量的 np.array 拷贝(用于求 MSE 等)。

  • off 属性:返回被标记关闭的方程索引列表;若 init 未调用则报错。


这段代码的设计意图(一句话版)

  • Solver:把“数据(x)→未知(u)→方程(f)→协调优化→误差检验→修正→报表”串成一个可复用的流水线。
  • VariableManager:让方程编写用“变量名”而非“数组索引”,降低出错率。
  • EquationManager:集中管理“每个方程的残差值”与“临时关掉哪些方程”的需求。

明显问题与改进建议(强烈建议采纳)

  1. 路径正则跨平台问题

    • 现在用 re.match(r'(.*\\)?(.*)\.csv', self.input_name) 只识别 Windows 路径。
    • 建议改为:
      python
      base = os.path.splitext(os.path.basename(self.input_name))[0] result_dir = f"result_{base}"
    • 另外创建目录建议 os.makedirs(self.output, exist_ok=True),避免父目录不存在时报错。
  2. drop_equations 里的断言明显写反

    • 现在是 assert self.f is None,意味着“若 f 已存在就失败”。
    • 合理语义应为 assert self.f is not None(要有 f 才能 drop)。同时实现 TODO。
  3. 隐式依赖 heat_rect 导入了很多名字

    • 当前文件直接使用 os/logging/datetime/np。若 heat_rect 没有转手导入或导出这些名字,会崩。
    • 更保险的做法:在本文件显式 import os, loggingfrom datetime import datetimeimport numpy as np
  4. 输入路径取上级目录的写法不直观

    • abspath(join(self.input_name, os.pardir)) 等价于 dirname(abspath(self.input_name)),建议改成后者更清晰。
  5. 异常捕获过宽

    • 多处 except:(不指定异常类型)会吞掉真实错误,排查困难。建议改为 except FileNotFoundError: 等精确类型,并在日志里带上 exc_info=True
  6. 日志与状态

    • 构造器里配置了 root logger;如果此模块被多次导入/多次实例化,可能重复添加 handler,导致日志重复。可在添加前检查是否已有相同 handler。
  7. 单位注释

    • 建议在 en_bal/h_eq 等方法里标注清楚输入/输出单位(J/kg? kJ/kg? W? MW?),避免使用者误解缩放因子 1e6/1e3 的意义。

如果你愿意把 bal_func / dvr_func 的具体实现(你是如何用 VariableManager 写质量/能量/状态方程的)也贴上来,我可以继续按“逐行”方式把方程装配过程、链式求解和检验的细节讲透,并顺便给你一个最小可运行示例(CLI + 示例 CSV + 生成 HTML 报告)。

Domanda

命令行参数解析是什么意思?能否更通俗一些?

Risposta

好的,我用更通俗的方式来解释一下 “命令行参数解析”


1. 什么是命令行参数?

当你在命令行里运行一个 Python 程序时,可以在后面加上一些“额外的文字”,告诉程序要用哪些设置或输入。
比如:

bash
python my_program.py --proj demo --input data.csv --desc "测试项目"

这里的 --proj demo--input data.csv--desc "测试项目" 就叫 命令行参数。它们是用户在启动程序时传给程序的“外部指令”。


2. 为什么需要解析?

程序拿到的原始参数就是一串字符串,例如:

python
["--proj", "demo", "--input", "data.csv", "--desc", "测试项目"]

这只是原始的文字,程序还不认识它们的含义。
命令行参数解析 就是把这些文字“翻译”成程序里能直接用的变量。

解析之后,程序会得到一个“参数对象”,里面的属性就是你想要的内容:

python
args.proj # "demo" args.input # "data.csv" args.desc # "测试项目"

3. 为什么有用?

  • 灵活性:不用每次都改代码,只要在运行时传不同的参数,就能处理不同的文件、生成不同的结果。
  • 可读性:别人看到命令行就能大致明白程序的用途和设置。
  • 自动化:方便脚本批处理或集成到其他系统。

4. 用生活比喻

想象你开外卖 App 点餐:

  • 菜品是程序本体(比如“炒饭.py”)。
  • 你下单时选择的“辣度=中辣”、“加鸡蛋=是”、“外送地址=学校宿舍”,就相当于命令行参数。
  • 程序会先把这些“文字选项”解析成内部变量,然后根据这些设置去做饭、打包、送餐。

要不要我给你写一个特别简短的小例子(只有几行代码),让你能直接在命令行试一试命令行参数解析是怎么工作的?

Domanda

@property是什么意思?

Risposta

在 Python 里,@property 是一个 语法糖(装饰器),用来把一个方法“伪装”成像属性一样访问。


1. 普通方法 vs. 属性

通常情况下,如果你写一个方法,需要加括号调用:

python
class Person: def get_name(self): return "Alice" p = Person() print(p.get_name()) # 需要加括号

但如果加上 @property

python
class Person: @property def name(self): return "Alice" p = Person() print(p.name) # 不用括号,像访问变量一样

结果完全一样,但是调用方式更自然。


2. 为什么有用?

  • 更直观:调用时就像访问属性,不必像函数那样加括号。
  • 可控性:你可以在里面做计算、检查,而对外看起来就是一个属性。
  • 封装性:以后内部实现改变了(比如从常量改为计算),外部代码也不用改。

3. 生活类比

想象你有一台“智能空调”:

  • 空调面板上显示“温度=26℃”,你看到的是一个属性。
  • 实际上,空调内部会根据传感器实时计算出温度。
  • @property 就像是把复杂的计算封装起来,但外部用的人看上去就是一个简单的属性

4. 在你代码里的例子

你那段 Solver 代码里有:

python
@property def log_path(self): return os.path.abspath(os.path.join(self.output, self.proj_name + '.log'))

这意味着:

  • 你可以写 solver.log_path(像访问变量一样),直接得到日志文件路径。
  • 实际上,每次访问它,都会运行函数体,动态计算并返回正确的路径。

要不要我给你做一个小例子,演示 @property 还能配合 setter@xxx.setter)实现“读写属性”,这样你就能更直观地理解它的威力?

Domanda

image.pngimage.png

quality = self.f.verify(self.x, u=None if self.link_mode else self.u, chi2=True)
在上述命令中,计算过程在哪里?

Risposta

在你提到的这一行:

python
quality = self.f.verify(self.x, u=None if self.link_mode else self.u, chi2=True)

真正的“计算过程”发生在 self.f.verify(...) 里面


拆开看:

  1. self.f
    Solver 初始化时,self.f 被设定为一个 Equation 对象(在 create_var() 里由 Equation(md_f) 生成)。
    这个对象里面存储了 冗余平衡方程(能量守恒、质量守恒等),以及和测量数据的对应关系。

  2. verify(...) 方法

    • 作用:对当前的“已知测点数据 (self.x)”和“未知测点数据 (self.u)”代入冗余方程,计算残差,然后做统计检验。
    • 参数:
      • self.x:测量点数据(已知变量)。
      • u=None if self.link_mode else self.u:如果是“链式模式”,不需要手动传未知量,否则传 self.u
      • chi2=True:告诉函数要用 卡方检验 的方式来评价残差。

    计算内容通常包括:

    • xu 代入方程,得到残差向量。
    • 根据误差协方差矩阵,算出一个 统计量(卡方值/品质因数)
    • 输出一个 quality 指标(例如:卡方值与临界值的比值,或者“品质因数”)。
  3. 结果赋值给 quality

    • quality < 1 → 表示整体数据没有粗大误差(即残差统计量低于临界值)。
    • quality >= 1 → 表示存在粗大误差

所以:

  • 代码这一行本身不做计算,它只是调用。
  • 实际的数值计算发生在 Equation.verify() 方法里面
  • 在你的代码片段中没有贴出 Equation 类的实现,但可以肯定:残差计算、卡方检验、统计指标生成,全都写在 verify() 方法里。

要不要我帮你推断一下 verify() 方法内部大概会做的数学公式(比如残差平方和 / 卡方检验过程),这样你就能完全理解“计算发生在哪里 + 算了什么”?

Domanda

import os
import logging
from functools import partial
from datetime import datetime
from typing import Callable
import scipy.linalg as la
import numpy.linalg as nla
from scipy.stats import chi2 as c2
from scipy.stats import f as fd
from scipy.stats import norm as nd
import pandas as pd
from quick_grad import *
from load_data import Data
from load_data import MeasureData
from load_data import UnmeasuredData
from load_data import unit_convert

def create_random_input(var_dict, dx=0.01, std=0.05, gross_error=False, gross_index=None, gross_level=1.2):
"""
根据给定的变量字典 var_dict,创建一组随机模拟测点变量数据

text
:param var_dict: 变量字典,以变量名称为键名、变量平衡值为键值 :param dx: 系统误差(测点均值偏差)相对大小,默认值为 0.01 :param std: 随机误差(测点标准差)相对大小,默认值为 0.05 :return: 测点变量均值 x、95% 置信度区间宽度 xv、协方差矩阵 Sx """ x0 = np.array(list(var_dict.values()), np.float64) x = x0 + np.array(np.random.rand(len(x0))) * x0 * dx # std error generation xv = np.array(np.random.rand(len(x0))) * x * std confidence = 1.96 Sx = np.diag((xv / confidence) ** 2) return x, xv, Sx

def plugin_covariance(Sx, name1, name2, n2i, rho=0.1):
"""
根据给定的测点名称及索引字典,在协方差矩阵中附加不同测点之间的协方差值。
该函数为原位函数,无返回值。

text
:param Sx: 变量协方差矩阵 :param name1: 要附加协方差的随机相关测点名称1 :param name2: 要附加协方差的随机相关测点名称2 :param n2i: 变量名称索引字典,可通过 `create_name_index` 函数生成 :param rho: 变量相关系数大小,|rho| <= 1,默认值为 1 """ idx1, idx2 = n2i[name1], n2i[name2] assert idx1 != idx2 # should not plug in (self) variance Sx[idx1, idx2] = Sx[idx2, idx1] = rho * Sx[idx1, idx1] * Sx[idx2, idx2]

def mse(x):
return np.sqrt(np.sum(x ** 2) / len(x))

def pivot_transform(p):
indices = np.arange(len(p))
for i in range(len(p)):
if p[i] != i:
indices[p[i]], indices[i] = indices[i], indices[p[i]]
return indices

def elapsed_time(init_time):
return (datetime.now() - init_time).seconds

class Partialer:
"""
用于将部分索引位置上的变量从数组或矩阵中移除,或反之补零还原为之前的形状。
"""
def init(self, n, index):
self.length = n
self.index = index
self.keep = []
for i in range(n):
if i not in index:
self.keep.append(i)
self.keep = np.array(self.keep)

text
def __len__(self): return self.length def partial_array(self, array, axis=0): assert axis in (0, 1) if axis == 0: return array[self.keep] elif axis == 1: return array[:, self.keep] def partial_matrix(self, matrix): return matrix[self.keep][:, self.keep] def full_array(self, partial_array, axis=0): assert axis in (0, 1) if axis == 0: if np.ndim(partial_array) == 1: array = np.zeros(len(self)) else: assert np.ndim(partial_array) == 2 array = np.zeros(len(self), partial_array.shape[1]) array[self.keep] = partial_array[:] else: assert np.ndim(partial_array) == 2 array = np.zeros(partial_array.shape[0], len(self)) array[:, self.keep] = partial_array[:, :] return array def full_matrix(self, partial_matrix): mat = np.zeros((len(self), len(self))) n = partial_matrix.shape[0] for i in range(n): for j in range(n): k, l = self.keep[i], self.keep[j] mat[k, l] = partial_matrix[i, j] return mat

class Variable:
"""
变量基类。
"""

text
def __init__(self, var: Data): """ 使用特定类型的辅助数据实例化变量对象。 :param var: 初始化所用的辅助数据,必须是 Data 类型的对象. """ assert isinstance(var, Data) # Data 数据对象 self.var = var self._n2i() self.stash_flag = False # if stash old result self.total_penalty = 0.0 # for chi^2 verification self.val = None # 用于检测数据是否创建 @property def names(self): return self.var.name def __len__(self) -> int: return len(self.var) def __getitem__(self, item): if type(item) == str: return self.val[self.n2i[item]] else: return self.val[item] def __setitem__(self, item, val): if type(item) == str: self.val[self.n2i[item]] = val else: self.val[item] = val def init_md(self): """ 处理 Data 数据对象当中的数值与单位信息,转换为统一单位的 Torch 张量数据。 """ if not isinstance(self.var, Data): raise TypeError('当前测量对象绑定的数据类型有误(应为 Data,实为 {}),请检查!'.format( str(type(self.var)))) def show(self): """ 在 stdout 中输出当前数据条目内容。仅支持使用 Data 数据对象定义的测点对象。 """ assert isinstance(self.var, Data) print(self.var.data) def stash(self): """ 储存当前变量值,供之后运算时比较用。 """ # logging old values self.val_old = self.val.copy() if not isinstance(self, Unknown): # 未知测点没有不确定度信息,不进行存储 self.S_old = self.S.copy() self.intv_old = self.intv.copy() self.stash_flag = True def debug(self): """ 调试函数,返回变量索引包装后的数据字典。 """ dic = {} for name in self.names: dic[name] = float(self[name]) return dic def _n2i(self, no_repeat=True): """ 创建变量名称索引字典,使得可以通过变量名称索引到测点下标。 :param no_repeat: 不允许出现重复定义(同名多条目)。默认为 True. """ n2i = {} repeat_names = [] for i, name in enumerate(self.names): if name in n2i.keys() and no_repeat: repeat_names.append(name) else: n2i[name] = i if len(repeat_names) > 0: for name in repeat_names: logging.error("变量条目 '{}' 重复定义,而当前数据结构不允许重复定义,请检查数据正确性!".format(name)) raise ValueError("Variable {} is repeatedly defined in data file {}.".format(repeat_names, self.var.path)) self.n2i = n2i

class Measured(Variable):
"""
测量变量类,即已知测点变量类。
"""
def init(self, var: MeasureData):
assert isinstance(var, MeasureData)
super().init(var)
self.redundancy = {} # 定义冗余测点集
self.eq_err = [] # 迭代步当中方程残差
self.chis = [] # 卡方值
self.err = [] # 粗大误差测点集

text
def init_md(self): """ 使用 MeasureData 数据对象初始化数据。 """ super().init_md() assert isinstance(self.var, MeasureData) self.var.convert() # 转换数据单位 # 生成 Numpy 数据 self.init_val = np.array(self.var.val) self.val = np.array(self.var.val) self.init_intv = np.array(self.var.intv) self.intv = np.array(self.var.intv) self.S = np.diag((self.intv / 1.96) ** 2) # 记录 CST 与 OFF 测点 exclude = [] for i in range(len(self)): if self.var.accuracy[i] == 'CST': exclude.append(i) exclude.extend(self.var.off) # 加入 OFF 测点 self.part = Partialer(len(self), exclude) self._n2i(index=self.var.off) # 重新生成 n2i self.stash() def plugin_covariance(self, idx1, idx2, rho=0.1): self.S[idx1, idx2] = self.S[idx2, idx1] = rho * sqrt(self.S[idx1, idx1] * self.S[idx2, idx2]) self.stash() def evaluate(self, f, u, eps=1e-3, max_iter=100, link_mode=True): """ 根据给定测点变量,构建数据协调优化问题并求解,获得已知测点修正值及未知测点新估计值。 :param f: Equations 对象,平衡方程残差变量 :param u: Unknown 对象,未知测点,可置空 :param eps: 收敛判断时容许的最大残差,默认值为 1e-3 :param max_iter: 容许的最大迭代次数,默认值为 100 :param link_mode: 是否启用链接模式,即使用 u 变量中预定义的热平衡函数求解 x 到 u 的映射关系,默认为 True :return: 无返回值,相应计算结果直接更新到变量属性当中 """ # 已知变量 self.v = np.zeros_like(self.val) self.v = self.part.partial_array(self.v) # 未知变量 if not link_mode: du = np.ones_like(u.val) # 协方差 S = self.part.partial_matrix(self.S_old) # 原始协方差矩阵,过程中不更新 Sv = np.zeros_like(self.S) Sv = self.part.partial_matrix(Sv) sx = self.part.partial_array(np.diag(self.S_old)) s_inv = 1 / sx # 过程变量 counter = 0 timer = datetime.now() # 用于统计单步用时 self.total_penalty = 0.0 self.eq_err, self.chis = [], [] while counter < max_iter: # 方程与变量裁剪 f.drop(self, u) # 移除被关闭的方程 f_names = f.part.partial_array(np.array(f.names)) arange = np.arange(len(f_names)) f_val = f.part.partial_array(f.val) f_grad = self.part.partial_array(f.grad, axis=1) # 去除未使用测点 f_grad = f.part.partial_array(f_grad, axis=0) # 去除未使用方程 f_grad_u = f.part.partial_array(f.grad_u, axis=0) # 去除未使用方程 f_S = f.part.partial_matrix(f.S) # 链接模式预处理 if link_mode: # 检查零对角元 f_diag = np.diag(f_S) f_zero = arange[f_diag == 0] if len(f_zero) > 0: index_zero = f_names[f_zero] raise ValueError('存在方差为零的平衡方程,请检查其冗余性:{}'.format(index_zero)) else: # 检查极小对角元 # 仅在第 0 步检查 if counter == 0: f_small = arange[np.abs(f_diag) < 1e-6] if len(f_small) > 0: for si in f_small: fs_name, fs_val = f_names[si], f_diag[si] logging.warning(f'方程 {fs_name} 方差极小({fs_val:.2E}),请注意确认其冗余性。') F_val = f_val F_grad = f_grad F_S = f_S # 非链接模式预处理 else: # QR 分解 m, p = f_grad_u.shape assert m > p, f'方程数({m})不大于未知变量数({p}),无法求解,请检查!' Q, R = nla.qr(f_grad_u, mode='complete') Q1, Q2 = Q[:, :p], Q[:, p:] R1 = R[:p, :] # 仅在需要计算未知测点修正量时使用 # 消元降维 F_val = Q2.T @ f_val F_grad = Q2.T @ f_grad F_S = Q2.T @ f_S @ Q2 # LU 分解 Dlu = la.lu_factor(F_S) # 奇异矩阵检测 zeros = np.diag(Dlu[0]) == 0 if len(np.arange(len(F_val))[zeros]) > 0: r = nla.matrix_rank(F_S) logging.error( '当前方程协方差矩阵不可逆(矩阵秩 {} < 满秩 {}),无法进行数据协调计算,请检查输入数据!'.format(r, len(F_val))) zero_index = pivot_transform(Dlu[1])[zeros] if not link_mode: # 非链接模式下需还原到原始矩阵判断冗余平衡方程项 zero_set = set() for zi in zero_index: real_zero_index = arange[np.abs(Q2[:, zi]) > 1e-6] for zj in real_zero_index: zero_set.add(zj) zero_index = list(zero_set) zero_names = [f_names[i] for i in zero_index] logging.error(f'当前冗余平衡方程中的可疑项(共 {len(zero_index)} 项):{zero_names}') # 退出计算 exit(1) # 计算测点修正量 multiplier = la.lu_solve(Dlu, F_val) # 拉格朗日乘子 FSx = F_grad @ S dv = FSx.T @ multiplier # 计算单步修正量 self.v += dv # 累加 FSv = FSx - F_grad @ Sv # 过程中修正协方差 Sv += FSv.T @ la.lu_solve(Dlu, FSv) # 累加 err_v = mse(dv) # 计算未知变量修正量 if not link_mode: Dluu = la.lu_factor(R1) QF = Q1.T @ (f_val - f_grad @ dv) ddu = la.lu_solve(Dluu, QF) du += ddu u_grad = la.lu_solve(Dluu, Q1.T @ f_grad) # 模拟链式法则求解未知变量雅可比 err_u = mse(ddu) # 计算系统总体卡方值 v2 = self.v ** 2 self.total_penalty = v2 @ s_inv # 叠加修正步 self.step() if link_mode: # 链接模式下调用热平衡函数更新未知变量 u.solve(self) else: # 非链接模式下使用增量更新未知变量 u.val = u.val_old - du u.S = u_grad @ (S - Sv) @ u_grad.T f.mapping(self, u, link_mode=link_mode) # 记录数据 err_f = mse(f_val) # 计算方程残差 counter += 1 self.eq_err.append(err_f) self.chis.append(self.total_penalty) # 输出日志 err = err_v if link_mode else err_u # 不同模式下选取不同的变量计算迭代步长 step_time = datetime.now() - timer step_time = step_time.seconds + step_time.microseconds / 1e6 logging.info('[{:d} / {:d}] 总卡方值 {:.3f} | 迭代步长 {:.3E} | 方程残差均方差 {:.3E}.(单步用时 {:.3f} 秒)'.format( counter, max_iter, self.total_penalty, err, err_f, step_time)) timer = datetime.now() # 判断是否收敛 if err < eps: logging.info('数据协调计算已在 {:d} 步内收敛,当前方程残差均方差为 {:.3E}。'.format(counter, err_f)) break # 超出最大迭代次数 elif counter == max_iter: logging.warning('数据协调计算未在 {:d} 步内收敛,请检查输入!当前方程残差均方差为 {:.3E}'.format(max_iter, err_f)) # 数据处理及还原 sv = np.diag(Sv) # 提取对角元 penalty = v2 / np.maximum(sv, sx / 10) # 按 VDI-2048 标准计算卡方值 self.penalty = self.part.full_array(penalty) self.Sv = self.part.full_matrix(Sv) def evaluate2(self, f, eps=1e-6): """ 使用二阶算法构建及求解数据协调优化问题。 不支持包含未知测点的情形。 TODO: 使本算法适配含未知测点的情形. :param f: 平衡方程残差项。 :param eps: 迭代收敛限,默认值为 1e-6。 """ v0 = np.zeros_like(self.val) v = np.zeros_like(self.val) err = 1 H = f.hess(self.val) while err > eps: Hv0 = (H @ v0).T F = f.grad.T + Hv0 # Dlu = la.lu_factor(f.grad @ self.S @ F) Hfv0 = (v0.T @ H @ v0) / 2 # multiplier = la.lu_solve(Dlu, f.val + Hfv0) multiplier = la.solve(f.grad @ self.S @ F, f.val + Hfv0) v = self.S @ F @ multiplier err = np.sqrt(np.sum((v - v0) ** 2) / len(v)) v0[:] = v logging.debug('itering, err: {:.3E}'.format(err)) self.v = v self.multiplier = multiplier Dlu = la.lu_factor(f.S) FSx = F.T @ self.S F_ast = la.lu_solve(Dlu, FSx) self.Sv = FSx.T @ F_ast self.total_penalty = np.sum(np.abs(f.val @ self.multiplier)) # 计算总卡方值 # 计算各变量卡方值 self.penalty = np.square(self.v) / np.maximum(np.diag(self.Sv), np.diag(self.S) / 10) def rectify(self): """ 确认无粗大误差后,施加修正量,完成变量数据协调。 """ # 修正协方差矩阵 self.val = self.val_old - self.part.full_array(self.v) self.S = self.S_old - self.Sv self.variance = np.diag(self.S) self.intv = self.variance ** 0.5 * 1.96 # self.bound() # bound constraints def step(self): """ 在未知测点算法迭代过程中,步进当前测点变量值。 """ self.val = self.val_old - self.part.full_array(self.v) # self.bound() def locate(self): """ 粗大误差定位。 :return: 可能存在粗大误差的测点下标。 """ self.r = np.sqrt(self.penalty) excessive_index = np.argsort(self.r)[::-1][:len(self.r[self.r > 1.96])] self.err = excessive_index # 储存索引变量 return excessive_index def bound(self): """ 更新测点值后,应检查确认某些类型的测点值不超过约束范围。 """ # TODO: 实施约束之前先做一轮单位转换 assert isinstance(self.var, MeasureData) for i in range(len(self)): if self.val[i] < self.var.lb[i]: self.val[i] = self.var.lb[i] elif self.val[i] > self.var.ub[i]: self.val[i] = self.var.ub[i] def relax(self): """ 根据粗大误差识别的结果,对存在粗大误差的变量进行校验。 """ self.val[:] = self.val_old # 更新为初值 for i in self.err: self.intv[i] *= 2 # 不确定度放宽 1.5 倍 logging.info("测点 '{}' 的粗大误差由 {:.2f}% 放宽至 {:.2f}%。".format( self.names[i], self.init_intv[i] / self.val[i] * 100, self.intv[i] / self.val[i] * 100)) self.S = np.diag((self.intv / 1.96) ** 2) # 重新计算协方差矩阵 def patch_unknown(self, u): """ 对于 OFF 测点,从未知变量计算结果当中提取对应结果赋值。 """ for i in self.var.off: name = self.names[i] defined_flag = False if name in self.names: # 已知测点中有同名测点 if name in self.n2i: # 变量名可索引到值(非 OFF 测点) j = self.n2i[name] # 获取主变量索引 assert j not in self.var.off # 备用检查 # 需进行单位逆转换 val, intv = unit_convert(self.val[j], self.var.vartype[i], self.var.vunit[i], self.intv[j], self.var.accuracy[j], reverse=True) defined_flag = True if name in u.names: # 同时出现在已知与未知测点当中,检查问题 logging.warning('测点 {} 有测量值,同时定义了同名未知测点,请检查模型,增补冗余方程!'.format(name)) if name in u.names and not defined_flag: # 未知测点集中求解了相应值 j = u.n2i[name] # 需进行默认单位下的单位逆转换 val, intv = unit_convert(u.val[j], u.var.vartype[j], self.var.vunit[i], u.intv[j], acc=None, reverse=True) defined_flag = True if defined_flag: self.val[i] = val self.intv[i] = intv else: logging.debug('测点 {} 全部 OFF 且未通过热平衡求值,请检查模型!'.format(name)) def _n2i(self, no_repeat=True, index=None): self.redundancy = {} # 清空冗余测点集 if isinstance(self.var, MeasureData): n2i = {} for i in range(len(self)): name, token = self.var.name[i], self.var.token[i] if not pd.isna(token): n2i[token] = i # 对所有测点,都允许通过测点位号索引 if name not in n2i.keys(): if index is None or i not in index: n2i[name] = i # 对于第一个非 OFF 测点,允许额外通过变量名称索引 elif index is not None and i not in index: # 当前测点为冗余测点 if name in self.redundancy.keys(): self.redundancy[name].append(token) # 直接将位号加入现有列表 else: self.redundancy[name] = [token] # 创建冗余点列表 self.n2i = n2i else: # 未使用 MeasureData 初始化数据,不存在冗余测点 super()._n2i(no_repeat=no_repeat) # 按默认方式处理

class Unknown(Variable):
"""
未知测点类。
"""

text
def __init__(self, var: UnmeasuredData): super().__init__(var) self.grad = None # 定义导数变量 def init_static(self): """ 对未知测点,生成静态初始预测值。 """ self.val = np.ones(len(self)) # 检查 var 对象中的 init_val if self.var.init_val is not None: for i in range(len(self)): if not pd.isna(self.var.init_val[i]): self.val[i] = self.var.init_val[i] else: # 未提供初值,按单位类型设置默认值 if self.var.vartype[i] == 'h': self.val[i] = 500 # kJ/kg elif self.var.vartype[i] == 'load': self.val[i] = 1000 # kW elif self.var.vartype[i] == 't': self.val[i] = 323.15 # K elif self.var.vartype[i] in ('dp', 'dt'): self.val[i] = 0 # 差量类型初值为 0 else: # 其他一切变量类型 self.val[i] = 1 self.intv = np.ones(len(self)) self.stash() def calc_static(self): """ 对未知测点,在采用迭代方式求解的情况下更新不确定度。 """ self.intv = np.sqrt(np.diag(self.S)) * 1.96 def init_md(self): """ 读取 Data 数据对象当中的信息,并初始化相应的数值结构。 """ super().init_md() self.val = None self.new = True self.func = None self._default_unit() # 分析并创建默认单位 def construct(self, solve_func: Callable): """ 绑定求解函数,用已知测点值求解未知测点平衡值。 :param solve_func: 求解函数。 """ self.func = solve_func self.jac = jacobian(self.func) self.hess = hessian(self.func) def solve(self, x: Measured, update_grad=True): """ 根据已知测点 x 的值,按给定的求解函数计算未知测点的值, 同时利用函数关系求出雅可比矩阵和协方差等信息。 :param x: 已知测点 x,应为 Measured 对象。 :param update_grad: 是否在本轮计算中更新梯度值,默认为 True. """ assert self.func is not None # 需要绑定求解函数 self.val = self.func(x.val) if update_grad: self.grad = jacobian(self.func)(x.val) self.S = self.grad @ x.S @ self.grad.T self.variance = np.diag(self.S) self.intv = np.sqrt(self.variance) * 1.96 if isinstance(self.var, Data): # 更新到数据库中 if self.new: self.var.data = self.var.data.assign(value=pd.Series(self.val).values, accuracy=pd.Series(self.intv).values) self.new = False else: self.var.data['value'] = pd.Series(self.val).values self.var.data['accuracy'] = pd.Series(self.intv).values if not self.stash_flag: self.stash() def bound(self): """ 根据上下界信息约束当前测点值。暂时未实现。 """ pass def _default_unit(self): vunit = [] for i in range(len(self)): vtype = self.var.data['type'][i] if vtype == 'h': vunit.append('kJ/kg') elif vtype == 'qm': vunit.append('kg/s') elif vtype in ('p', 'dp'): vunit.append('MPa') elif vtype == 't': vunit.append('K') elif vtype == 'dt': vunit.append('C') elif vtype in ('load', 'pw'): vunit.append('kW') elif vtype == 'v': vunit.append('m3/kg') else: vunit.append('') self.var.data['vunit'] = vunit

class Intermediate(Variable):
"""
辅助变量类,可用于构建中间变量或半方程量。
用作中间变量时,可用于表示较复杂的中间过程量,如焓值等。
用作半方程量时,可表示平衡方程一侧量。此类存在的主要目的是确定方程残差的总体量级,以便确认迭代终止条件。
"""

text
def __init__(self, var): super().__init__(var) def init_md(self): super().init_md() def construct(self, func): """ 构建函数关系。 :param func: 从已知及未知测点映射到半方程项的函数。 """ self.func = func self.jac = jacobian(self.func) self.hess = hessian(self.func) def mapping(self, input_x, input_u=None, update_grad=True, link_mode=True): """ 实施变量映射,并通过自动微分计算因变量关于自变量的偏导数(雅可比矩阵)值。 :param input_x: 已知测点变量,应为 Measured 对象。 :param input_u: 未知测点变量,应为 Unmeasured 对象,默认值为 None。 :param update_grad: 是否在本轮映射当中更新梯度值,默认为 True. :param link_mode: 是否使用链接模式传递未知变量对已知变量的关系,默认为 True. """ if input_u is None: self.val = self.func(input_x.val) if update_grad: self.grad = jacobian(self.func)(input_x.val) self.grad_u = None # for varify self.S = self.grad @ input_x.S @ self.grad.T else: self.val = self.func(input_x.val, input_u.val) if update_grad: if link_mode: self.grad = jacobian(lambda x: self.func(x, input_u.func(x)))(input_x.val) else: self.grad = jacobian(lambda x: self.func(x, input_u.val))(input_x.val) self.grad_u = jacobian(lambda u: self.func(input_x.val, u))(input_u.val) self.S = self.grad @ input_x.S @ self.grad.T self.variance = np.array(np.diag(self.S)) # self.variance[self.variance < 0] = 0 # 消除因特殊原因变负的对角项 self.intv = self.variance ** 0.5 * 1.96 if not self.stash_flag: self.stash()

class Equation(Intermediate):
"""
方程类。
"""

text
def __init__(self, var): super().__init__(var) self.offs = [] # 关闭的方程 self.part = None # 用于关闭方程 self.drop_func = None # 方程关闭函数 def init_md(self): super().init_md() self.val = None def construct(self, func, prev=None): """ 构建函数及导函数映射关系。 :param func: 函数表达式。 :param prev: 前置函数。默认为 None。 """ if prev is not None: self.func = func self.rawfunc = lambda x: self.func(prev.func(x)) self.jac = jacobian(self.rawfunc) self.hess = hessian(self.rawfunc) else: super().construct(func) def bind_drop(self, drop_func): """ 绑定用于判断关闭哪些方程的函数。 """ self.drop_func = drop_func def mapping(self, input_x, input_u=None, update_grad=True, link_mode=True): """ 实施变量映射。 :param input_x: 映射输入的主变量项,可为 HalfEquations 或 Measured 对象。 :param input_u: 映射输入的次变量项,若 input_x 为 HalfEquations 对象则应为空, 若其为 Measured 对象则 input_u 应为 Unmeasured 对象。默认为 None。 :param update_grad: 是否更新梯度值,默认为 True. :param link_mode: 是否按链式法则传递未知变量关于已知变量的梯度,默认为 True. """ super().mapping(input_x, input_u, update_grad, link_mode) if isinstance(input_x, Intermediate): # 输入的为中间变量半方程,需要使用链式法则计算梯度 self.grad = self.grad @ input_x.grad if input_x.grad_u is not None: self.grad_u = self.grad @ input_x.grad_u if not self.stash_flag: self.stash() def verify(self, x, u=None, chi2=False): """ 验证总体粗大误差。 :param x: 已知测点变量,应为 Measured 对象。 :param u: 未知测点变量,应为 Unmeasured 对象。默认值为空。 :param chi2: 是否使用卡方判据替代 VDI2048 判据,默认为 False。 """ if u is not None: # unknown variables exist, use Chi^2 distribution according to standard assumptions x.K_crt = c2.ppf(0.95, len(self) - len(u)) else: # no unknown variables, follow VDI-2048 and use F distribution if chi2: x.K_crt = c2.ppf(0.95, len(self)) else: x.K_crt = fd.ppf(0.95, len(self), 1e10) return x.total_penalty / x.K_crt # 返回品质因数 def locate(self): """ 方程粗大误差定位。 :return: 可能存在粗大误差的方程下标。 """ self.r = np.zeros_like(self.val) # 方程粗大误差指标 for i in range(len(self)): if i not in self.offs: sigma = self.S[i, i] ** 0.5 if sigma == 0: self.r[i] = 0 else: self.r[i] = self.val_old[i] / sigma self.r = np.abs(self.r) # 取绝对值 alpha = 0.05 beta = 1 - (1 - alpha) ** (1 / len(self)) phi = nd.ppf(1 - beta) excessive_index = np.argsort(self.r)[::-1][:len(self.r[self.r > phi])] return excessive_index def residual(self, y=None): if isinstance(y, Intermediate): return float(np.max(np.abs(self.val / y.val_old[::2]))) else: # 未构建半方程对象,直接输出各方程绝对残差值 return self.val def patch_redundancy(self, x: Measured): """ 将冗余测点间的平衡方程补充到现有的平衡方程当中。 """ red = x.redundancy if len(red.keys()) > 0: n2x = x.n2i n0 = len(self.n2i) # 增加冗余测点方程前的方程数量 eqs = {} # 冗余测点方程集 for name in red.keys(): tokens = red[name] for i in range(len(tokens)): token = tokens[i] eq_name = 'MEA{:d}_{}'.format(i+1, name) # 方程名称 # 增加条目 self.var.add_item({'name': eq_name, 'type': 'MEA', 'description': '{} 测点冗余测量平衡方程'.format(token)}) eqs[eq_name] = [name, token] # 包装为完整的函数 self._n2i() # 重新生成名称索引 self.func_old = partial(self.func) # 拷贝原先函数副本 def new_func(x, u): f = self.func_old(x, u) for eq_name in eqs: name, token = eqs[eq_name] f[self.n2i[eq_name]] = x[n2x[name]] - x[n2x[token]] return f self.func = new_func def drop(self, x: Measured, u: Unknown): """ 关闭某些不需要使用的冗余平衡方程。 """ if self.drop_func is None: self.part = Partialer(len(self), []) # 不移除任何方程 else: self.offs = self.drop_func(x.val, u.val) self.part = Partialer(len(self), self.offs)

class OutputVariable(Intermediate):
"""
输出变量,如机组热经济性指标、部件性能指标等。
"""
def init(self, var):
super().init(var)

text
def mapping(self, input1, input2=None): """ 实施变量映射。 :param input1: 输入变量之一,可为任意 Variable 对象。 :param input2: 输入变量之二,可为任意 Variable 对象,默认值为 None。 """ if input2 is not None: self.func(input1, input2) else: self.func(input1)

上述代码为文件:heat_rect.py的代码,请问在verify验证总体粗大误差处,c2.ppf是什么意思?这个方法是什么含义?

Pensiero

Explaining c2.ppf

The c2.ppf function from scipy.stats computes the chi-square critical value for a given probability (like 0.95) and degrees of freedom. It’s often used to calculate thresholds in statistical tests. In practice, this critical value helps in determining whether a result falls below a certain threshold of significance.

Chi-square ppf explanation

The c2.ppf function (percent point function, or inverse CDF) returns the chi-square value for a given probability (like 0.95), based on the degrees of freedom. Here, you compare the statistic (total_penalty) to the critical value (K_crt). If it's lower, accept the null hypothesis. Example: for 10 degrees of freedom, the chi-square value is approximately 18

Thought for 14s

Risposta

简短说:c2.ppf(...) 就是卡方分布的“分位点函数”(percent point function,= 逆 CDF)。
给它一个概率 q 和“自由度” df,它会返回阈值 k,使得

P(Xk)=q,Xχ2(df)P(X \le k)=q,\quad X\sim \chi^2(\text{df})

在你的代码里,这个 k 就是卡方检验的临界值


代码里的用法

python
from scipy.stats import chi2 as c2 # 有未知量 u 时(方程个数 m 减去未知量个数 p 作为自由度) x.K_crt = c2.ppf(0.95, len(self) - len(u)) # 没有未知量但指定用卡方检验时 x.K_crt = c2.ppf(0.95, len(self))
  • 0.95:显著性水平 α=0.05 的上 95% 分位点(右侧 5% 尾部作为拒绝域)。
  • len(self) - len(u)len(self)自由度(通常 = 独立方程数 m;有未知量时有效独立约束减少为 m−p)。

随后代码返回:

python
return x.total_penalty / x.K_crt # 品质因数

这里 x.total_penalty 是统计量(类似“标准化残差平方和”);
x.total_penalty < x.K_crt 时(即返回值 < 1),认为整体无粗大误差;否则认为存在粗大误差


直观理解

把卡方分布想成一条曲线;c2.ppf(0.95, df) 找到一个点 k,让曲线左边的面积是 95%。
如果你的统计量没超过这个点,就还在“正常波动”的 95% 范围内;超过了,就落在“异常”的 5% 尾部。

小例子:自由度 df=10 时,c2.ppf(0.95, 10) ≈ 18.31
如果统计量 < 18.31,接受“无粗大误差”的假设;否则拒绝。

Domanda

x.K_crt是什么用法?x是类,K_crt是某种属性吗?

Risposta

是的,x.K_crt 在这里就是 类实例的一个属性


1. x 是谁?

在你的程序里,xMeasured 类的实例(已知测点变量对象)。
它继承自 Variable,本身没有在 __init__ 里定义过 K_crt


2. K_crt 从哪来?

Equation.verify() 里有这么一段:

python
def verify(self, x, u=None, chi2=False): if u is not None: # 有未知量时,自由度 = 方程数 - 未知数,用卡方分布 x.K_crt = c2.ppf(0.95, len(self) - len(u)) else: # 没有未知量 if chi2: x.K_crt = c2.ppf(0.95, len(self)) else: x.K_crt = fd.ppf(0.95, len(self), 1e10) return x.total_penalty / x.K_crt

也就是说,verify() 方法运行时,会动态地在 x 对象上添加一个新属性 K_crt,用来保存“检验的临界值”。

在 Python 里,这是允许的:你可以在类定义之外,随时给对象绑定新的属性。


3. K_crt 的作用

  • 含义:临界值(critical value),用于粗大误差的卡方/F 检验。
  • 用途:在 verify() 中,返回的“品质因数”就是
    python
    quality = x.total_penalty / x.K_crt
    如果 quality < 1 → 没有粗大误差;否则 → 存在粗大误差。

4. 举个简单例子

python
class A: def __init__(self): self.total_penalty = 10 a = A() # 动态给对象添加新属性 a.K_crt = 15 print(a.total_penalty / a.K_crt) # 10/15 = 0.666...

这和你的 x.K_crt 用法是一样的。


✅ 总结:

  • xMeasured 类的对象。
  • K_crtEquation.verify() 中被赋值,相当于“卡方/F 检验临界值”。
  • 它不是在类里事先写死的,而是运行时动态加上的属性。

要不要我帮你画一个“数据流小图”,把 x.total_penaltyx.K_crtverify() 中是怎么结合起来判断“有没有粗大误差”的?

Domanda

什么是链接模式?

Risposta

在你给的这套代码体系里,“链接模式 (link mode)” 指的是:
👉 在求解过程中,已知测点 x 与未知测点 u 之间的关系,通过预定义的“热平衡函数”直接联立起来计算


1. 两种模式的区别

  • 链接模式 (link_mode=True)

    • Unknown 类里,construct(self, solve_func) 会绑定一个函数:u = bal_func(x),即未知量 u 可以由已知量 x 通过热平衡函数算出来。
    • Measured.evaluate()Solver.init() 里,如果 link_mode 打开,就会直接用这个函数更新 u
    • 这样,u 不再是独立的自由变量,而是通过链式法则从 x 计算得来。
      (比如:知道温度、压力,就能算出焓;这就是典型的“已知决定未知”的场景。)
  • 非链接模式 (link_mode=False)

    • u 被当作独立的未知变量。
    • 在数据协调优化中,它会和 x 一起,通过冗余方程残差最小化来求解。
    • 在这种情况下,要构造 f(x, u) 并同时解出 u 和修正后的 x

2. 在代码里的位置

  • Solver.__init__ 里有 self.link_mode = link,默认是 True
  • Solver.init() 里有:
    python
    if self.link_mode: self.u.construct(self.bal_func)
    表明在链接模式下,用热平衡函数初始化 u
  • Solver.solve() 里有:
    python
    if self.link_mode or self.bal_func is not None: self.u.solve(self.x) else: self.u.init_static()
    在链接模式下,直接用 x 推算 u

3. 为什么要有两种模式?

  • 链接模式:更贴近物理真实情况,计算效率高,未知量是“从属变量”。
  • 非链接模式:更灵活,可以把 u 当作独立变量,适合做灵敏度分析或没有明确函数关系时的情况。

4. 生活类比

想象一个锅炉模型:

  • 链接模式:你测了锅炉的压力 P 和温度 T,就能通过热力学公式直接算出焓 H → u = f(x)
  • 非链接模式:你把焓 H 也当成独立变量,和压力、温度一起交给方程去解。

要不要我帮你画一个“小图”,把 链接模式非链接模式x → u → f 之间的数据流对比直观展示出来?

Domanda

请解释文件heat_rect.py中,evaluate方法的流程。

Pensiero

Thought for 4 seconds

Risposta

下面把 heat_rect.pyMeasured.evaluate() 的执行流程按“入口 → 每轮迭代 → 结束收尾”讲清楚。它的目标是:在冗余方程约束下,对已知测点 x 做数据协调(求修正量 v),并在需要时通过“链接模式”更新未知测点 u,直到收敛。


入口阶段:准备数据与常量

  1. 初始化已知量的修正向量
python
self.v = np.zeros_like(self.val) self.v = self.part.partial_array(self.v)
  • self.v 存“对测点 x 的修正量”(VDI-2048 里的 adjustment)。
  • self.partPartialer,会裁剪掉常值点(CST)与 OFF 点,只在可调整的测点上优化。
  1. 若非链接模式,给未知量修正量占位
python
if not link_mode: du = np.ones_like(u.val)
  • 非链接模式下,u 当作独立未知,要同时解 du
  1. 协方差相关的准备
python
S = self.part.partial_matrix(self.S_old) # 原始协方差(不变) Sv = self.part.partial_matrix(np.zeros_like(self.S)) # 修正过程中累计增量 sx = self.part.partial_array(np.diag(self.S_old)) # 原始方差对角 s_inv = 1 / sx # 其逆,用于卡方值
  1. 循环控制量与日志累积
python
counter = 0 timer = datetime.now() self.total_penalty = 0.0 self.eq_err, self.chis = [], []

迭代阶段(最多 max_iter 次)

A. 取当前方程、梯度、协方差并做“方程/变量裁剪”

python
f.drop(self, u) # 先根据 drop 规则屏蔽某些方程 f_names = f.part.partial_array(np.array(f.names)) arange = np.arange(len(f_names)) f_val = f.part.partial_array(f.val) f_grad = self.part.partial_array(f.grad, axis=1) # 去掉不可调整测点列 f_grad = f.part.partial_array(f_grad, axis=0) # 再去掉被关闭的方程行 f_grad_u= f.part.partial_array(f.grad_u, axis=0) # 同上(针对 u 的雅可比) f_S = f.part.partial_matrix(f.S) # 方程残差协方差
  • 这里把“不可参与优化的测点/关闭的方程”都裁掉,让线性代数只在有效自由度上进行。

B. 链接与非链接模式的分叉

链接模式(link_mode=True

  • u = u.func(x)u 不是自由变量。
  • 只对 方程残差 做统计:
    • 检查 f_S 的对角元是否为零(方差为 0 意味着方程被写死,不可用)。
    • 记录极小方差(可能冗余)。

令:

python
F_val = f_val # 有效残差 F_grad = f_grad # 对 x 的雅可比(已裁剪) F_S = f_S # 残差协方差

非链接模式(link_mode=False

  • u 当独立未知,需要先消掉 u 再在 x 空间里优化(把问题降到只含 v 的约束)。
  • 做一次 QR 分解f_grad_u
python
m,p = f_grad_u.shape # m=方程数, p=未知数 Q,R = nla.qr(f_grad_u, mode='complete') Q1,Q2= Q[:,:p], Q[:,p:] R1 = R[:p,:] # 后面解 du 用
  • Q2 在方程空间做消元降维:
python
F_val = Q2.T @ f_val F_grad = Q2.T @ f_grad F_S = Q2.T @ f_S @ Q2

直觉:把“与 u 相关的约束”投影掉,只留下对 v 有效的独立约束。

C. 求解拉格朗日乘子并得到测点修正量

  1. F_S 做 LU 分解并检查奇异
python
Dlu = la.lu_factor(F_S) # 若奇异(秩亏),报错并提示可能病态的方程索引
  1. 拉格朗日乘子与增量
python
multiplier = la.lu_solve(Dlu, F_val) # λ = F_S^{-1} * F_val FSx = F_grad @ S dv = FSx.T @ multiplier # v 的单步修正量 self.v += dv
  1. 在迭代中更新“修正协方差累积量”
python
FSv = FSx - F_grad @ Sv Sv += FSv.T @ la.lu_solve(Dlu, FSv)
  1. 若非链接模式,还要解未知量修正量 du
python
if not link_mode: Dluu = la.lu_factor(R1) QF = Q1.T @ (f_val - f_grad @ dv) ddu = la.lu_solve(Dluu, QF) # 解 du du += ddu u_grad = la.lu_solve(Dluu, Q1.T @ f_grad) # du 对 x 的近似雅可比

D. 统计量与变量更新(链式/非链式分开)

  1. 更新总体卡方值
python
v2 = self.v ** 2 self.total_penalty = v2 @ s_inv # Σ (v_i^2 / σ_i^2)
  1. 步进:把本轮修正应用到 x(但还不“确认”)
python
self.step() # self.val = self.val_old - full(self.v)
  1. 更新未知量 u 与方程映射
python
if link_mode: u.solve(self) # 用热平衡函数从 x 重新算 u else: u.val = u.val_old - du # 直接用 du 更新 u.S = u_grad @ (S - Sv) @ u_grad.T f.mapping(self, u, link_mode=link_mode)

E. 日志、收敛判据与循环控制

  1. 记录误差曲线
python
err_f = mse(f_val) # 方程残差均方差 self.eq_err.append(err_f) self.chis.append(self.total_penalty)
  1. 选取迭代步长指标
python
err = mse(dv) if link_mode else mse(ddu)
  1. 打印进度日志(步号 / 总卡方 / 步长 / 残差 / 用时)

  2. 收敛判定

python
if err < eps: # 收敛, break elif counter == max_iter: # 未收敛,给出 warning

收尾阶段:输出最终量(供后续 verify / 报告使用)

  1. 把累积的协方差增量 Sv 的对角抽出
python
sv = np.diag(Sv)
  1. 计算每个测点的单点卡方惩罚(VDI-2048 的“限幅”做法)
python
penalty = v2 / np.maximum(sv, sx / 10) self.penalty = self.part.full_array(penalty) # 还原到完整测点序 self.Sv = self.part.full_matrix(Sv) # 同上
  • 这些结果会被 verify()rectify()、报告生成等后续步骤使用。

一句话总结

evaluate() 就是在“当前 x、(可能还有)u、冗余方程 f、以及协方差”下,最小化约束残差得到一组测点修正量 v(以及非链接模式下的 du),循环更新 x/u 与方程,直到步长小于 eps 收敛;最后产出卡方统计量、修正后的协方差增量等,为后续 verify() 判粗大误差rectify() 应用修正 做准备。

Domanda

hope.py

---------

CO2-SS# 机组模型

author: 西安交通大学

import sys
import argparse
from heat_rect import *
from excel_loader import data_template
from solver import Solver
from solver import VariableManager
from solver import EquationManager
from coolprop import *

if name == 'main':
# argparse 模块
parser = argparse.ArgumentParser(description='数据协调计算程序')
parser.add_argument('--proj', type=str, required=False, default='CO2-SS', help='项目名称')
parser.add_argument('--input', type=str, required=False, default='CO2-SS.csv', help='输入数据名称')
parser.add_argument('--desc', type=str, required=False, default='微型燃气轮机系统', help='项目描述')
parser.add_argument('--output', type=str, required=False, default=None, help='结果输出路径')
parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径')
args = parser.parse_args()

text
# 转换数据文件模式 if args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv') init_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output) n2x = solver.n2x # 获取索引变量 n2u = solver.n2u # 创建辅助变量集 X = VariableManager(n2x, n2u) # 热平衡函数 def u_val(x): X.init(x) ng = NG() # gas = GAS() # # Comp X['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P']) X['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF']) X['C_O_T'] = air_t_hp(X['C_O_H'], X['C_O_P']) X['C_LOAD'] = X['C_I_M'] * (X['C_O_H'] - X['C_I_H']) # Combustion Chamber X['FUEL_H'] = ng.h_tp(X['C_I_T'], X['C_I_P']) X['CC_O_M'] = X['C_I_M'] + X['FUEL_M'] X['CC_O_H'] = (X['FUEL_M'] * X['LHV'] + X['C_O_H'] * X['C_I_M'] + X['FUEL_H'] * X['FUEL_M'])/(X['CC_O_M']) X['CC_O_P'] = X['C_O_P'] * X['PLOSS_EFF'] # Turbo X['T_I_H'] = gas('H_TP', X['T_I_T'], X['T_I_P']) X['T_O_H'] = gas_turbine_h1(X['T_I_H'], X['T_I_P'], X['T_O_P'], X['T_EFF']) X['T_O_T'] = gas('T_HP', X['T_O_H'], X['T_O_P']) X['T_LOAD'] = X['T_I_M'] * (X['T_I_H'] - X['T_O_H']) # Gen X['GEN_LOAD'] = (X['T_LOAD'] - X['C_LOAD'] - X['MECH_LOSS'])*X['GEN_EFF'] return X.val # 创建数据协调平衡方程 n2f = solver.n2f F = EquationManager() # 冗余平衡方程 def f_val(x, u): F.init(solver.f) X.init(x, u) # ====== F['C_O_T_BAL'] = X['C_O_T'] - X['C_O_T1'] F['AIR_BAL'] = X['FUEL_M'] * X['AIR_ratio'] - X['C_I_M'] F['T_I_H_BAL'] = X['T_I_H'] - X['CC_O_H'] F['T_I_M_BAL'] = X['T_I_M'] - X['CC_O_M'] F['T_I_P_BAL'] = X['T_I_P'] - X['CC_O_P'] F['T_O_T_BAL'] = X['T_O_T'] - X['T_O_T1'] F['GEN_BAL'] = X['GEN_LOAD'] - X['ELE_LOAD'] return F.val # 执行计算 solver.create_func(u_val, f_val) solver.routine()

综合前面的代码,上述代码请逐行分析

Pensiero

Structuring the solver

I'm planning to break down the setup step-by-step. I’ll start with header lines, imports, argparse, and the conversion step. Then I'll set up the solver and define each computation for its variables, along with EquationManager F. A few key equations will help balance things like air mass flow and energy balances, then I'll run the solver.

Clarifying variables and units

I need to mention the units and naming conventions. For example, we’re generating unknown variables via u_val and computing compressor outlet enthalpy using air_comp_h1 and T('gas'). I'll also clarify the gas function confusion—it’s used as a function from coolprop. It's fine as it is. Additionally, I'll explain pressure loss efficiency (PLOSS_EFF) and other efficiency variables.

Explaining 'VariableManager' usage

I need to detail how the VariableManager works. It starts by setting up references with X.init(x), and I can access variables like X['name'] for either measured or unknown values. Only unknowns get written to u initially. If I try assigning a known variable, it raises an error. Some assignments involve computed variables like C_O_T, where I assign X['C_O_T'] instead of a measured value like C_O_T1.

Explaining EquationManager F integration

I need to explain the EquationManager F setup. When initialized, F.init(solver.f) connects the manager to the solver's current equations. Then, I use X.init(x, u) to access the variable arrays. For each equation, I set residual values to zero. The solver.create_func(u_val, f_val) links balance and data reconciliation functions, with u_val acting as bal_func by default. The solver's routine goes through init, solve, verify, and save stages.

Thought for 43s

Risposta

下面把你给的 hope.py 逐行/逐段解释一遍,并结合前面贴过的 solver.pyheat_rect.py 的类与流程,让每一行“放到系统里”都讲清楚它在做啥。


顶部与注释

python
# hope.py # --------- # CO2-SS# 机组模型 # author: 西安交通大学
  • 文件说明:一个“CO2-SS 机组”的数据协调与热平衡模型脚本。

导入

python
import sys import argparse from heat_rect import * from excel_loader import data_template from solver import Solver from solver import VariableManager from solver import EquationManager from coolprop import *
  • sys:系统相关(未直接用到,但保留无妨)。
  • argparse:命令行参数解析(让脚本可用 --proj/--input/... 灵活配置)。
  • from heat_rect import *:引入前面那份热平衡/数据协调底层实现(Measured/Unknown/Equation、数值函数、统计学等工具)。
  • data_template:把外部 Excel/CSV 转换成此系统需要的模板格式。
  • Solver / VariableManager / EquationManager:你在 solver.py 里定义的“总控求解器”“变量名访问器”“方程残差管理器”。
  • from coolprop import *:热物性接口(如 air_h_tpair_comp_h1gas(...) 等)。这里假设你把需要的热物性封装在这个模块里。

主入口

python
if __name__ == '__main__':
  • 只有直接运行 python hope.py 时,下面的逻辑才会执行;被当作模块导入时不会跑。

1) 命令行参数解析

python
parser = argparse.ArgumentParser(description='数据协调计算程序') parser.add_argument('--proj', type=str, required=False, default='CO2-SS', help='项目名称') parser.add_argument('--input', type=str, required=False, default='CO2-SS.csv', help='输入数据名称') parser.add_argument('--desc', type=str, required=False, default='微型燃气轮机系统', help='项目描述') parser.add_argument('--output',type=str, required=False, default=None, help='结果输出路径') parser.add_argument('--trans', type=str, required=False, default=None, help='转换输入文件路径') args = parser.parse_args()
  • 允许你在命令行传入项目名、输入文件、报告描述、输出目录、是否先做模板转换等。
  • 解析后得到 args.proj / args.input / args.desc / args.output / args.trans
bash
# 例子 python hope.py --proj CO2-SS --input data/CO2-SS.csv --desc "微型燃机系统A" --output outdir

2) 可选:把外部文件转换成模板

python
if args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv')
  • 若提供了 --trans 原始文件,这里会调用 data_template(...) 生成一份 CO2-SS_data_template.csv,帮助你把外部格式适配到系统需要的列结构/单位。

3) 初始化求解器 Solver

python
init_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output)
  • 结合 solver.py
    • 路径:确定输入 CSV(找不到会到 data/ 下尝试),并创建输出目录(如 result_CO2-SS)。
    • 日志:初始化 <output>/<proj>.log
    • 读取数据create_var() 会:
      • 构造 self.x(Measured,已知测点),从 args.input 载入值/不确定度/单位;
      • 构造 self.u(Unknown,未知测点),文件 <proj>_u.csv
      • 构造 self.f(Equation,冗余平衡方程),文件 <proj>_f.csv
    • 报告对象create_file() 创建 DataFileHtmlReport,复制 .js/.css 静态资源,准备生成报告。
python
n2x = solver.n2x # 获取索引变量 n2u = solver.n2u
  • 取出“变量名→下标”的映射(来自 Measured/Unknown),方便后面用名字访问/赋值。

4) 构建“变量名访问器”X

python
X = VariableManager(n2x, n2u)
  • 结合你之前的 VariableManager
    • X['NAME'] 读取变量值(优先从 x,其次 u);
    • X['NAME'] = val 只允许给未知量赋值(给已知量赋值会抛错);
    • X.init(x, u) 绑定底层数组(在函数里会调用)。

5) 定义“热平衡函数” u_val(链接模式的核心

python
def u_val(x): X.init(x) ng = NG() # gas = GAS() # # Comp X['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P']) X['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF']) X['C_O_T'] = air_t_hp(X['C_O_H'], X['C_O_P']) X['C_LOAD'] = X['C_I_M'] * (X['C_O_H'] - X['C_I_H']) # Combustion Chamber X['FUEL_H'] = ng.h_tp(X['C_I_T'], X['C_I_P']) X['CC_O_M'] = X['C_I_M'] + X['FUEL_M'] X['CC_O_H'] = (X['FUEL_M'] * X['LHV'] + X['C_O_H'] * X['C_I_M'] + X['FUEL_H'] * X['FUEL_M'])/(X['CC_O_M']) X['CC_O_P'] = X['C_O_P'] * X['PLOSS_EFF'] # Turbo X['T_I_H'] = gas('H_TP', X['T_I_T'], X['T_I_P']) X['T_O_H'] = gas_turbine_h1(X['T_I_H'], X['T_I_P'], X['T_O_P'], X['T_EFF']) X['T_O_T'] = gas('T_HP', X['T_O_H'], X['T_O_P']) X['T_LOAD'] = X['T_I_M'] * (X['T_I_H'] - X['T_O_H']) # Gen X['GEN_LOAD'] = (X['T_LOAD'] - X['C_LOAD'] - X['MECH_LOSS'])*X['GEN_EFF'] return X.val

逐行说明(并指出这些“被赋值的名字”应当属于未知量集 u,否则会因 VariableManager 的检查报错):

  • X.init(x):把当前“已知测点数组”绑定给 X;未知量数组此处不传,意味着 本函数将“只向未知量赋值”
  • ng = NG():天然气工质对象(提供 h_tp 等性质函数)。

    coolprop 一侧应定义了 NG()gas(...) 等接口。

压气机(Comp)

  • C_I_H = air_h_tp(C_I_T, C_I_P):入口(温度 T、压力 P)→ 空气焓 H。
  • C_O_H = air_comp_h1(C_I_H, C_I_P, C_O_P, C_EFF):给定入口焓/压力、出口压力、等熵效率 → 出口焓。
  • C_O_T = air_t_hp(C_O_H, C_O_P):由出口焓、压力 → 出口温度。
  • C_LOAD = C_I_M * (C_O_H - C_I_H):压气机吸功(质量流 × 焓增)。

燃烧室(CC)

  • FUEL_H = ng.h_tp(C_I_T, C_I_P):按入口状态估计燃料的“有效焓”(建模近似)。
  • CC_O_M = C_I_M + FUEL_M:质量守恒。
  • CC_O_H = (...) / CC_O_M:能量守恒(把燃料 LHV、入口空气焓、燃料焓汇总到出口混合焓)。
  • CC_O_P = C_O_P * PLOSS_EFF:考虑燃烧室压降/保压效率。

涡轮(Turbo)

  • T_I_H = gas('H_TP', T_I_T, T_I_P):把涡轮进口 T、P 转成焓(对于燃气)。
  • T_O_H = gas_turbine_h1(T_I_H, T_I_P, T_O_P, T_EFF):已知进口焓/压、出口压、效率 → 出口焓。
  • T_O_T = gas('T_HP', T_O_H, T_O_P):由焓/压反算温度。
  • T_LOAD = T_I_M * (T_I_H - T_O_H):涡轮出功(质量流 × 焓降)。

发电机(Gen)

  • GEN_LOAD = (T_LOAD - C_LOAD - MECH_LOSS)*GEN_EFF:轴系净功减损耗、乘发电机效率 → 电功输出。

  • return X.val:把本轮求得的一整组未知量的值返回(Unknown.solve() 会接收这组值;这就是链接模式里“u = bal_func(x)”的核心)。

关键理解:在 链接模式 下,u_val 把你所有“能从 x 推导的未知量”一次性算出来,并通过 VariableManager 写入各个 u 项(如 C_I_H/C_O_H/C_O_T/... 等)。


6) 构建“方程残差管理器”

python
n2f = solver.n2f F = EquationManager()
  • 先拿到“方程名→索引”的映射,然后实例化一个空的方程残差管理器。
  • 真正绑定到当前 Equation 对象是在 f_val()F.init(solver.f) 那一步完成的。

7) 定义冗余平衡方程 f_val

python
def f_val(x, u): F.init(solver.f) X.init(x, u) # ====== F['C_O_T_BAL'] = X['C_O_T'] - X['C_O_T1'] F['AIR_BAL'] = X['FUEL_M'] * X['AIR_ratio'] - X['C_I_M'] F['T_I_H_BAL'] = X['T_I_H'] - X['CC_O_H'] F['T_I_M_BAL'] = X['T_I_M'] - X['CC_O_M'] F['T_I_P_BAL'] = X['T_I_P'] - X['CC_O_P'] F['T_O_T_BAL'] = X['T_O_T'] - X['T_O_T1'] F['GEN_BAL'] = X['GEN_LOAD'] - X['ELE_LOAD'] return F.val
  • F.init(solver.f):告诉 EquationManager 这一次要管理的方程集合是谁(solver.f),内部会抓取 n2f 并创建一个残差向量。
  • X.init(x, u):这次既有 x 也有 u(因为装配方程时要对两者的当前值做运算)。

每一条 F[...] = ... 都是“残差 = 0”形式的约束(理想为 0,求解时让它尽量接近 0):

  • C_O_T_BAL:压气机计算得到的出口温度 C_O_T 应等于测量C_O_T1

  • AIR_BAL:理论需氧量平衡:FUEL_M * AIR_ratio == C_I_M(燃料流量×需气比 = 压气机入口空气流)。

  • T_I_H_BAL:涡轮进口焓 = 燃烧室出口焓(稳态能量守恒)。

  • T_I_M_BAL:涡轮进口质量流 = 燃烧室出口质量流(稳态质量守恒)。

  • T_I_P_BAL:涡轮进口压力 = 燃烧室出口压力(忽略管道压降或已计入 PLOSS_EFF)。

  • T_O_T_BAL:涡轮计算得到的出口温度 T_O_T 应等于测量T_O_T1

  • GEN_BAL:计算得到的发电机输出功 GEN_LOAD 应等于电气测得的 ELE_LOAD

  • return F.val:返回“所有残差”的数组给上层(Equation.mapping() 会用到)。

你也可以把这些平衡写成“质量守恒/能量守恒/设备性能”的标准原语(如 VariableManager.mas_bal/en_bal),但这里直接按变量名写清楚也很好读。


8) 绑定函数并执行完整流程

python
solver.create_func(u_val, f_val) solver.routine()
  • create_func(u_val, f_val):把热平衡函数冗余方程函数塞进 Solver
    • 由于 Solverlink=True(默认),这一步使它进入链接模式
      • u 不再是自由变量,而是由 u_val(x) 直接算出来(并自动带雅可比);
      • f_val(x, u) 负责把“由 x/u 构造的物理残差”交给 Equation 做统计/线性化/权重处理。
  • solver.routine():一键跑完整流程:
    1. init():构造 u/f 的函数/梯度映射,补冗余测点方程;
    2. solve():执行 Measured.evaluate(...) 的迭代(见你之前问过的流程),更新 x 的修正向量、在链接模式下调用 u.solve(self) 根据 x 更新 u
    3. verify():卡方/F 判据做总体粗大误差检验与定位;若无粗大误差则做修正/更新残差;
    4. save_file():输出 JSON/CSV、生成 HTML 报告、保存日志。

小结(把这三个文件串成一张图)

数据流:

  1. 读取测量 x(CSV)→ 绑定未知 u 与方程 f 定义;
  2. 链接模式下 u = u_val(x)(热平衡一把算出未知);
  3. f_val(x,u) 生成冗余方程残差与雅可比 ∂f/∂x(链式传导 ∂u/∂x);
  4. evaluate() 解“带权最小化残差”的问题,得到测点修正 v,循环更新 x/u 直到收敛;
  5. verify() 做统计检验(c2.ppf / fd.ppf 求临界值),定位粗大误差;
  6. 生成报告与数据文件。

实战小贴士与易错点

  • 变量命名一致X['C_O_T'] 这类名字必须存在于 Unknown 的清单里(<proj>_u.csv),否则 VariableManager 会因“试图给已知量赋值”而报错。
  • 单位统一coolprop/空气性质函数返回的单位要与 u 表定义一致(比如焓 kJ/kg、温度 K、压力 MPa),否则方程平衡会虚假不一致。
  • C_O_T vs C_O_T1:一个是计算量(未知),一个是测量量(已知);你在方程里对它们做“差为零”的校核,这是典型的数据协调写法。
  • PLOSS_EFF:乘数小于 1 代表压降,记得在数据文件里给它合适的值或不确定度。
  • gas 的使用:你注释掉了 gas = GAS(),却用 gas('H_TP', ...)/gas('T_HP', ...),说明这里期望 gas 是一个函数(来自 coolprop),不要再遮蔽这个名字。

如果你愿意把 <proj>_u.csv<proj>_f.csv 的头几行也贴出来(变量名与单位),我可以帮你逐项对照 u_val/f_val,检查是否一一对应并给出更精确的单位检查清单。

Domanda

在上述代码中,数据协调方法主要的流程是输入,中间的数据处理以及输出协调后的结果,针对输入,涉及到输入测点基本信息(测点名称、仪表的测量值、仪表的不确定度等),上述输入是在哪里体现的?计算完成后又输出到了哪里?

Risposta

非常好的问题 👍
我们把整个 数据协调方法 的“输入 → 处理中间步骤 → 输出” 串起来看,就能明确 输入测点信息在哪里体现,以及 协调后的结果最后去了哪里


1. 输入:测点基本信息在哪里体现?

Solver.create_var() 中:

python
# 已知测点 md_x = MeasureData(self.input_name) # 读取输入 CSV md_x.load() self.x = Measured(md_x) # 构造已知测点对象 self.x.init_md()
  • self.input_name 就是 --input 传入的 CSV 文件(如 CO2-SS.csv)。
  • 这个 CSV/Excel 模板里存储了:
    • 测点名称(如 C_O_T1ELE_LOAD
    • 仪表测量值value 列)
    • 不确定度/置信区间intv 列)
    • 单位类型/上下界/准确性标志unit, lb, ub, accuracy 等)

这些信息通过 MeasureData.load()Measured.init_md() 被加载,内部会:

  • 转换单位到系统统一基准(convert())。
  • 构造测点值数组 self.val,不确定度数组 self.intv
  • 生成协方差矩阵 self.S = diag((intv/1.96)^2)

所以,输入测点基本信息就在 CSV 文件里,并在初始化时进入 Measured 类。


2. 中间:数据协调处理过程

核心流程发生在 Measured.evaluate()Solver.solve()

  • 根据冗余方程 f_val(x,u),计算残差 f.val
  • 构造约束优化问题(卡方最小化):调整测点值 x,让残差尽量接近 0,同时保持调整量不超过测点的不确定度。
  • 迭代求解:不断修正 x(和可能的未知量 u),直到收敛。
  • 计算总体卡方值 x.total_penalty 并和临界值 x.K_crt 比较(用来判断有无粗大误差)。
  • 若存在粗大误差,还会调用 locate()rectify(),定位并修正有问题的测点。

3. 输出:协调后的结果输出到哪里?

Solver.save_file() 中:

python
# 保存数据文件 self.data_file.convert_unit() self.data_file.save() # 保存 JSON self.data_file.save_csv() # 保存 CSV # 保存报告 self.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse()
  • JSON 文件:包含全部测点最终值、误差统计量、卡方值等(机器可读)。
  • CSV 文件:表格形式,便于在 Excel 里打开。
  • HTML 报告:自动生成图表和分析结果的网页报告。
  • 日志文件<proj>.log,记录每一步计算与误差检验过程。

输出路径是 --output 参数指定的文件夹;若未指定,会在输入文件目录下生成 result_<proj> 文件夹。


4. 总结一句话

  • 输入:来自 --input 指定的 CSV 文件(如 CO2-SS.csv),里面写了每个测点的名称、测量值、不确定度、单位等 → 在 MeasureData/Measured 中加载。
  • 处理中:在 evaluate() / solve() 中构造残差方程,迭代调整测点值,最小化卡方残差。
  • 输出:协调后的结果保存到 result_xxx 文件夹里,包含 JSON 数据文件、CSV 表格、HTML 报告和日志。

要不要我帮你画一张“数据流示意图”,把 CSV → Measured → evaluate → verify → rectified x/u → JSON/CSV/HTML 的流程用图形表示出来?这样会更直观。

Domanda

import os
import re
import numpy as np
import pandas as pd
import logging

def num_format(num):
"""
根据变量的值(0、极小极大数--科学计算数法、普通数--两位小数),将其转化为指定样式的数据格式。
"""
if 1e-2 <= np.abs(num) < 1e3:
return '{:.2f}'.format(num)
elif num != 0:
return '{:.1E}'.format(num)
else:
return '0'

def unit_convert(val, vtype, vunit=None, intv=None, acc=None, atmos=0.101325, reverse=False, name=None):
"""
转换或逆转换单个物理量的单位。

text
:param val: 要转换的值。 :param vtype: 要转换的值所属于的物理量类型。 :param vunit: 要转换的值的单位。默认为 None,即使用默认单位(不需要转换的那个)。 :param intv: 要转换的值的不确定度,以便按 vunit 一同转换单位,默认为 None。 :param acc: 原始数据中的 acc 信息,仅用来判断测点是否关闭。 :param atmos: 大气压值。默认为标准大气压(MPa)。 :param reverse: 是否逆转换(将标准单位换为 vunit 对应的单位)。默认为 False。 :param name: 变量的名称,默认为 None。 """ new_val, new_intv = None, None vname = '' vflag, undef_flag = False, False # 单位处理 factor = 1 # 单位乘数因子 adder = 0 # 单位加减因子 if val == 'OFF' or acc == 'OFF': # OFF 测点处理 new_val = val if intv is not None: new_intv = intv # 压力转换 elif vtype in ('p', 'dp', 'atm'): # 标准单位:MPa vname = '压力' if vtype == 'p' else '压差' if vunit in (None, 'MPa', 'MPa.a'): pass elif vunit in ('kPa', 'kPa.g', 'kPa0'): factor = 1e-3 elif vunit == 'hPa': factor = 1e-4 elif vunit == 'daPa': # 十帕 factor = 1e-5 elif vunit in ('Pa', 'pa'): factor = 1e-6 elif vunit in ('bar', 'Bar'): factor = 0.1 else: vflag = True if vtype == 'p' and vunit in ('kPa.g', 'MPa'): # 表压,不是压差或绝对压力 adder = atmos # 温度转换 elif vtype == 't': # 标准单位:K vname = '温度' if vunit in (None, 'K'): pass elif vunit == 'C': adder = 273.15 else: vflag = True # 温度转换 elif vtype == 'L': # 标准单位:m vname = '高度' if vunit == 'mm': factor = -0.101325 / 760 adder = 0.101325 else: vflag = True # 温差转换 elif vtype == 'dt': # 温差,K 与 C 都一样,不需要转换 vname = '温差' if vunit in (None, 'K', 'C'): pass else: vflag = True # 流量转换 elif vtype == 'qm': # 标准单位:kg/s vname = '质量流量' if vunit in (None, 'kg/s'): pass elif vunit == 't/h': factor = 1 / 3.6 else: vflag = True # 功率转换 elif vtype in ('pw', 'load'): # 标准单位:kW vname = '功率' if vtype == 'pw' else '负荷' if vunit in (None, 'kW', 'KW', 'kw'): pass elif vunit in ('MW', 'mw'): factor = 1e3 elif vunit in ('W', 'w'): factor = 1e-3 else: vflag = True # 焓值转换 elif vtype == 'h': # 标准单位:kJ/kg vname = '比焓' if vunit in (None, 'kJ/kg'): pass elif vunit == 'kJ/kmol': factor = 18.01528 else: vflag = True # 面积转换 elif vtype == 'a': # 标准单位:m2 vname = '面积' if vunit in (None, 'm2', 'm^2'): pass else: vflag = True # 换热系数转换 elif vtype == 'k': # 标准单位:kW/m2/K vname = '换热系数' if vunit in (None, 'kW/m2/K', 'kW/K/m2'): pass else: vflag = True elif vtype in ('num', 'x'): # 常规量纲 1 数值,不处理 vname = '无量纲数' if vtype == 'num' else '干度' if vunit == '%': # 百分比单位 factor = 1e-2 else: undef_flag = True # 实施逆转换,对已转换成标准单位的已知变量使用,以还原标准值 if reverse: new_val = (float(val) - adder) / factor if intv is not None: new_intv = float(intv) / factor # 实施正转换,对原始输入的已知变量值使用 else: if name is not None: # 在提供名称的情况下添加日志信息 if vflag: # 未能识别单位 logging.warning("{}测点 '{}' 的单位 {} 未能识别,将采用默认单位进行转换!".format(vname, name, vunit)) elif undef_flag: logging.warning("测点 '{}' 类型 {} 未能识别,将以标准单位读入其数值!".format(name, vtype)) # 使用因子实施转换 new_val = float(val) * factor + adder # 在提供不确定度的情况下,对不确定度进行转换 if intv is not None: new_intv = float(intv) * factor return new_val, new_intv

def batch_unit_convert(vals, vtypes, vunits=None, intvs=None, acc=None, atmos=0.101325, reverse=False):
"""
批量(逆)转换物理量的单位。
"""
new_vals = []
new_intvs = []
for i in range(len(vals)):
vtype = vtypes[i]
val = vals[i]
if vunits is not None:
vunit = vunits[i]
else:
vunit = None
if intvs is not None:
intv = intvs[i]
else:
intv = None
if acc is not None:
accuracy = acc[i]
else:
accuracy = None
new_val, new_intv = unit_convert(val, vtype, vunit, intv, accuracy, atmos, reverse)
new_vals.append(new_val)
new_intvs.append(new_intv)
if intvs is None:
new_intvs = None
return new_vals, new_intvs

class TagsData:
"""
由外方软件 TAGS_DEF 部分转换而来的数据读取模块。
"""
def init(self, path):
self.path = os.path.join('data', path)
self.data: pd.DataFrame = pd.read_csv(self.path, header=0)
self.target = None

text
@classmethod def vtype(cls, name): if name[-2:] == '_P': return 'p' elif name[-2:] == '_T': return 't' elif name[-3:] == '_DP' or name[-4:] in ('_DP1', '_DP2'): return 'dp' elif name[-3:] == '_HW': return 'h' elif name[-4:] in ('_TTD', '_DCA'): return 'dt' elif name[-5:] == '_VAPF': return 'x' elif name[-5:] == '_AREA': return 'a' elif name[-5:] == '_LOAD': return 'load' elif name[-5:] == '_VOLF' or name[-7:] == '_VOLF_V': return 'qv' elif name[-5:] == '_LOSS' or name[-6:] == '_POWER': return 'pw' elif name[-6:] == '_UGLOB': return 'k' elif name[-6:] == '_MASSF' or name[-8:] in ('_MASSF_L', '_MASSF_V'): return 'qm' elif name[-8:] == '_DENSITY': return 'rho' else: return 'num' def trans(self): names, tokens = [], [] vals, vunits = [], [] vtypes = [] accs, aunits = [], [] for index, row in self.data.iterrows(): name = row['TAG NAME'] token = None if re.fullmatch(r'\d[A-Z]{3}\d{3}[A-Z]{2,3}', name): token = name name = row['OWNER VAR.NAME'].replace(' ', '_') name = name.replace('-', '_') # 统一转换为当前写法 names.append(name) tokens.append(token) acc = row['MEA.ACC.'] aunit = None vunit = row['P.U.'] vtype = TagsData.vtype(name) # 不确定度处理 if acc[0] == '-': # 负数表示相对不确定度 aunit = '%' acc = acc[1:] elif vunit == 'C': # 温度不确定度单位为摄氏度 aunit = 'C' accs.append(acc) aunits.append(aunit) vunits.append(vunit) vals.append(row['MEA.VAL.']) vtypes.append(vtype) self.target = pd.DataFrame({ 'name': names, 'type': vtypes, 'description': None, 'value': vals, 'vunit': vunits, 'accuracy': accs, 'aunit': aunits, 'token': tokens, 'lb': None, 'ub': None }) def save(self, path=None): if path is None: path = self.path.replace('.csv', '_new.csv') else: path = os.path.join('data', path) self.target.to_csv(path, index=False)

class Elements:
"""
设备描述文件类。
"""
def init(self, name):
"""
初始化路径信息,并读取设备描述数据。

text
:param name: 系统名称,如 hn2、fq3 """ self.path = os.path.join('bundle', name + '_elem.csv') self.data = pd.read_csv(self.path, header=0, comment='#') self.var_dict = { 'P': '压力', 'T': '温度', 'M': '质量流量', 'V': '比容', 'QV': '体积流量', 'H': '比焓', 'X': '干度', 'S': '比熵', 'DP': '压降', 'DT': '温差', 'TTD': '上端差', 'DCA': '下端差', 'EFF': '效率', 'PR': '压比', 'K': '换热系数', 'A': '换热面积', 'VAR': '变量', 'FRAC1': '分流比1', 'FRAC2': '分流比2', 'M0': '设计质量流量', 'DP0': '设计压降', 'CFDP': '压降系数', 'DP1': '压降1', 'DP2': '压降2', 'Q': '比例流量', 'W': '功率', 'POWER': '功率', 'LOAD': '负荷', } def __len__(self): return len(self.data) def desc(self, name): """ 获取指定设备的描述信息。 :param name: 设备名称。 :return: 设备描述信息。 """ query = self.data[self.data['name'] == name]['description'] if len(query) > 0: return query.values[0] return name # 未查询到时原样返回 def param_desc(self, param_name): """ 获取指定参数的名称。 :param param_name: 参数名称。 """ if param_name in self.var_dict.keys(): return self.var_dict[param_name] else: # 未查询到描述的情况下原样返回,方便检查 return param_name def port_desc(self, port_name, elem_type): """ 获取指定端口的名称。 :param port_name: 端口名称。 :param elem_type: 设备类型,用来对特定字母调整端口描述。 """ if port_name == 'I': if elem_type == 'T': return '进汽' elif elem_type == 'HX': return '管侧给水' return '进口' elif port_name == 'O': if elem_type == 'T': return '排汽' elif elem_type == 'HX': return '管侧排水' return '出口' elif port_name == 'EX': if elem_type == 'HX': return '壳侧给水' return '抽汽' elif port_name == 'SH': return '壳侧给水' elif port_name == 'D': return '疏水' else: # 默认情况 return port_name + '侧' def elem_type(self, name): """ 获取指定设备的类型。 :param name: 设备名称。 :return: 设备类型。 """ query = self.data[self.data['name'] == name]['type'] if len(query) > 0: return query.values[0] return 'BX' # 未查询到时返回默认变量类型 BX(BOX) def parse_desc(self, name): """ 根据设备描述信息,解析变量名称的含义。 :param name: 测点名称。 """ tokens = name.split('_') if len(tokens) == 2: # 两段式,设备名 + 参数名 elem_name, param_name = tokens return self.desc(elem_name) + self.param_desc(param_name) elif len(tokens) >= 3: # 三段式,设备名 + 端口名 + 参数名 或 含_设备名 + 参数名 if len(tokens) == 3: elem_name, port_name, param_name = tokens else: elem_name = '_'.join(tokens[:-2]) port_name = tokens[-2] param_name = tokens[-1] complex_elem_name = elem_name + '_' + port_name if complex_elem_name in self.names: # 先查询是否存在对应的复杂设备名称(含 _) return self.desc(complex_elem_name) + self.param_desc(param_name) else: # 端口名 return self.desc(elem_name) + \ self.port_desc(port_name, self.elem_type(elem_name)) + \ self.param_desc(param_name) @property def names(self): return list(self.data['name'])

class Data:
"""
常规数据集类。
"""
def init(self, path):
self.data = None
self.path = path
self.elems = None # 设备描述文件

text
def __len__(self): return len(self.name) def load(self): self.data = pd.read_csv(self.path, header=0, comment='#') @property def name(self): return list(self.data['name']) @property def vartype(self): return list(self.data['type']) @property def description(self): return list(self.data['description']) def add_item(self, entry: dict): """ 增添新的数据条目。 :param entry: Dict 类型的条目内容,每次一条。 """ self.data = pd.concat([self.data, pd.DataFrame.from_dict({str(len(self.data)): entry}, orient='index')]) def load_elems(self, elems: Elements): self.elems = elems def drop(self, name): """ 删除指定名称的条目。 :param name: 待删除条目的名称。 """ self.data = self.data.drop(self.data[self.data['name'] == name].index)

class VariableData(Data):
def init(self, path):
super().init(path)

text
def load(self): super().load() # 自动生成变量单位 type_dict = { 'P': 'p', 'T': 't', 'H': 'h', 'V': 'v', 'X': 'num', 'M': 'qm', 'W': 'load', 'L': 'load', 'DP': 'dp', 'DT': 'dt', 'EFF': 'eff', 'FRAC': 'num', 'FRAC1': 'num', 'FRAC2': 'num', 'LOAD': 'load' } for i in range(len(self)): if pd.isna(self.vartype[i]): for sub in type_dict.keys(): if self.name[i][-len(sub)-1:] == '_' + sub: self.data.loc[i, 'type'] = type_dict[sub] if self.elems is not None and pd.isna(self.description[i]): self.data.loc[i, 'description'] = self.elems.parse_desc(self.name[i]) @property def lb(self): """ 变量下界,默认为 -inf. """ return list(self.data['lb']) @property def ub(self): """ 变量上界,默认为 inf. """ return list(self.data['ub']) def convert(self): """ 对于一般数据对象,仅处理上下界数值。 """ pass

class MeasureData(VariableData):
"""
测点数据集类。
"""
def init(self, path):
super().init(path)
self.val, self.intv = None, None
self.off = []
self.atmos = 0.101325

text
def load(self): super().load() # 数据列类型转换 self.data = self.data.astype({ 'name': str, 'type': str, 'description': str, 'vunit': str, 'aunit': str, 'token': str }) # 不确定度生成 for i in range(len(self.data)): if pd.isna(self.accuracy[i]): self.data.loc[i, 'accuracy'], self.data.loc[i, 'aunit'] = MeasureData._autoacc(self.name[i], self.vartype[i]) @property def value(self): return list(self.data['value'].astype(np.float64)) @property def vunit(self): return list(self.data['vunit']) @property def accuracy(self): return list(self.data['accuracy']) @property def aunit(self): return list(self.data['aunit']) @property def token(self): return list(self.data['token']) @property def dvalue(self): if 'dvalue' in self.data.keys(): return list(self.data['dvalue']) else: return [None] * len(self) def convert(self): """ 将输入数据转换成标准单位。 """ super().convert() # 执行上下界处理 n = len(self) val, intv = np.zeros(n, dtype=np.float64), np.zeros(n, dtype=np.float64) # 单位处理 for i in range(n): # 上下界处理 # TODO: 完善上下界生成机制,利用上设计值 lb = self.lb[i] ub = self.ub[i] # 仅对未赋上下界变量设置值 if pd.isna(lb) or pd.isna(ub): lb0, ub0 = MeasureData._generate_lb_ub(self.vartype[i]) if pd.isna(lb): lb, _ = unit_convert(lb0, self.vartype[i], self.vunit[i], atmos=self.atmos, reverse=True) if pd.isna(ub): ub, _ = unit_convert(ub0, self.vartype[i], self.vunit[i], atmos=self.atmos, reverse=True) self.data.loc[i, 'lb'] = lb self.data.loc[i, 'ub'] = ub # OFF 测点处理 if self.value[i] == 'OFF' or self.accuracy[i] == 'OFF': val[i] = 0.0 intv[i] = -1.0 self.off.append(i) # 记录 OFF 测点下标 continue # 非 OFF 测点进入值分析阶段 else: # 上下界检查 if self.value[i] < self.lb[i] or self.value[i] > self.ub[i]: # 值越界,进行处理 logging.warning('测点 {} 测量值 {} 越界 [{}, {}],请检查测量值正误!'.format( self.name[i], self.value[i], self.lb[i], self.ub[i])) # 若未提供设计值,则以上下界值进行约化 if pd.isna(self.dvalue[i]): value = self.lb[i] if self.value[i] < self.lb[i] else self.ub[i] logging.warning('测点 {} 无设计值,将使用上/下界值 {} 对测点测量值进行约化。'.format(self.name[i], value)) self.data.loc[i, 'value'] = value # 赋值时不能使用 property 定义 # 若提供了设计值,则直接约化为设计值 else: logging.warning('将使用设计值 {} 对测点 {} 测量值进行约化。'.format(self.dvalue[i], self.name[i])) self.data.loc[i, 'value'] = self.dvalue[i] # 单位转换 val[i], _ = unit_convert(self.value[i], self.vartype[i], self.vunit[i], None, self.accuracy[i], self.atmos, name=self.name[i]) # 更新数据集大气压 if self.vartype[i] == 'atm': self.atmos = float(val[i]) logging.info('读取到数据集中存在的大气压测点,当前大气压重置为 {:.3f}kPa。'.format(self.atmos * 1e3)) # 不确定度生成 # TODO: 完善不确定度单位判断机制,目前仅对不确度单位为 % 的情况进行区分 if self.aunit[i] == '%': # 百分比,相对不确定度 if val[i] == 0.0: # 使用相对不确定度时需规避 0 值出现,否则会使得协方差项为 0 intv[i] = 1e-6 # 使用极小值替代 0 else: intv[i] = val[i] * float(self.accuracy[i]) / 100.0 elif self.accuracy[i] == 'CST': # CST 测点,常数,不确定度设置为极小数 intv[i] = 1e-5 # 不能直接取为 0,影响后续计算正确性 else: intv[i] = float(self.accuracy[i]) # 生成转换格式后的输入数据 self.val = val self.intv = intv # self._redundant() def revert(self): """ 将标准单位的计算数据还原为输入时的单位形式。 """ raise NotImplementedError def _redundant(self): """ 处理冗余测点信息。 """ # 搜索重复项 repeated = [] for name in self.name: if self.name.count(name) > 1 and name not in repeated: repeated.append(name) # 逐一循环重复项 rm_index = [] for name in repeated: # 合并重复测点 index = np.arange(len(self))[np.array(self.name) == name] new_val = np.mean(self.val[index]) # 平均化测量值 new_intv = np.sqrt(np.mean(self.intv[index] ** 2)) / len(index) # 合成不确定度 self.val[index[0]] = new_val # 存储平均值 self.intv[index[0]] = new_intv # 存储不确定度 self.data.loc[index[0], 'token'] = ','.join(np.array(self.token)[index].tolist()) # 合并 token 信息 rm_index.extend(index[1:].tolist()) # 移除重复元素 rm = lambda array: np.delete(array, rm_index) self.val = rm(self.val) self.intv = rm(self.intv) self.data = self.data.drop(rm_index) @classmethod def _autoacc(cls, name, vartype): """ 自动生成未在数据表中声明的测点不确定度信息。 :param name: 测点名称。 :param vartype: 测点变量类型。 """ # switch if vartype == 'qm': acc, aunit = 5.0, '%' elif vartype == 'p': acc, aunit = 2.0, '%' elif vartype == 't': acc, aunit = 2.0, 'C' else: # 默认情况 logging.warning('未检测到测点 \'{}\' 变量类型可用的不确定度参数,将以默认变量类型设置其不确定度。'.format(name)) acc, aunit = 10.0, '%' logging.info('测点 \'{}\' 的不确定度未给定,根据变量类型 \'{}\' 设置为默认值 {}{}。'.format( name, vartype, num_format(acc), aunit)) return acc, aunit @classmethod def _generate_lb_ub(cls, vartype): lb = -np.inf # 下界 ub = np.inf # 上界 if vartype in ('p', 't', 'qm', 'x', 'pw'): lb = 0.0 if vartype == 'x': ub = 1.0 return lb, ub

class UnmeasuredData(VariableData):
def init(self, path):
super().init(path)

text
@property def init_val(self): """ 返回迭代所用的初始值。 """ if 'ivalue' in self.data.columns: return list(self.data['ivalue']) else: return None

if name == 'main':
tags_data = TagsData('hndata_4.csv')
tags_data.trans()
tags_data.save('hn2_x_4.csv')
补充上述load_data.py代码,请重新解释上述问题

Pensiero

Explaining input and output flow

The inputs are primarily from the MeasureData path used by Solver, involving a CSV read. Key fields get processed through unit conversion, bounds, and uncertainty generation. Outputs, like DataFile save, Csv, and HtmlReport, get saved in the result folder. MeasureData values (like 'value' and 'accuracy') are updated during the solve process.

Thought for 9s

Risposta

太好了,你把 load_data.py 也贴出来了。结合前面的 solver.py / heat_rect.py / hope.py,我把“输入在哪里体现中间如何处理输出到哪里”按代码落点重新梳理一遍(指到具体类/字段与关键逻辑),这样你能顺藤摸瓜定位每一步的数据。


一、输入在哪里体现?

1) 外部原始文件(用户给的 CSV)

  • hope.py / solver.parse_args() 里,--input 指向一个 CSV(例如 CO2-SS.csv)。
  • Solver.__init__create_var()
    python
    md_x = MeasureData(self.input_name) md_x.load() self.x = Measured(md_x) self.x.init_md()
    也就是:已知测点从这份 CSV 读入,落在 MeasureData.data 里(一个 pandas DataFrame)。

这张 CSV 里“测点基本信息”放在这些列

MeasureData.load() / VariableData.load() 里能看到用到的列:

  • name(测点名称)
  • type(变量类型,如 p/t/qm/h/…)
  • description(文字描述,可由 Elements.parse_desc() 自动补全)
  • value(仪表测量值,字符串或数值)
  • vunit(值的单位)
  • accuracy(不确定度值;若缺失会由 _autoacc() 自动生成)
  • aunit(不确定度单位:% 表示相对;C 表示温度的 ±℃;CST 表示常数)
  • token(测点位号,可用于冗余映射)
  • lb / ub(上下界,可为空,由 _generate_lb_ub() 补默认值)

如果你的外部系统导出的是另一套 TAGS 文件格式(例如 hndata_4.csv),TagsData.trans() 会把它转换成上述“目标列结构”(self.target),然后 save() 成你要的输入 CSV(见文件尾部 __main__ 演示)。

2) 未知量与冗余方程的定义文件

  • Unknowncreate_var() 里尝试读取 <proj>_u.csv(或 bundle/<proj>_u.csv),形成 UnmeasuredData.data,其中可含列 ivalue 作为迭代初值(UnmeasuredData.init_val)。
  • Equationcreate_var() 里读取 <proj>_f.csv,形成方程清单(方程名、类型、描述)。真正的残差计算由你在 hope.py 里写的 f_val(x,u) 负责。

二、中间的数据处理(读入后怎么“变成可计算的数”)

1) 读取与预处理:MeasureData.convert()

对每一行测点,做这些事情:

  1. 生成/补上下界

    • 如果 lb/ub 缺失,就按变量类型用 _generate_lb_ub() 给默认范围(例如压强/温度/流量/功率的下界设 0;干度上界 1)。
    • 再用 unit_convert(reverse=True) 把范围值从标准单位换回原始单位再写回 DataFrame(保证上下界与 value 同单位体系)。
  2. OFF 测点

    • value == 'OFF'accuracy == 'OFF',则将该点标记到 self.off,并把 val[i]=0.0intv[i]=-1.0(后续会裁剪掉)。
  3. 越界检查与约化

    • value 超过 lb/ub,先告警。
    • 如果 dvalue(设计值)缺失,则把 value 夹到边界;否则直接替换为设计值。
  4. 单位转换(核心)

    • unit_convert(value, vartype, vunit, ...)原始单位的测量值统一转换到标准单位
      • 压力标准单位:MPa(支持 kPa/kPa.g/Pa/bar 等);表压自动+大气压 atmos
      • 温度标准:K(支持 C → +273.15)。
      • 质量流量:kg/s(支持 t/h → ÷3.6)。
      • 功率/负荷:kW(MW/W 会缩放)。
      • 焓:kJ/kg(你还支持 kJ/kmol 的转化)。
      • ……
    • 同时,如果这个测点的 type == 'atm',读到它后会更新数据集的大气压 self.atmos,供压力表压转换使用。
  5. 不确定度生成(转成 1.96 置信区间的半宽)

    • aunit == '%',按相对不确定度计算:intv = value * accuracy%,并在 value==0 时给一个极小值防止协方差为 0。
    • accuracy == 'CST',设一个极小不确定度 1e-5
    • 否则按绝对不确定度intv = float(accuracy)
    • 如果 accuracy 缺失,_autoacc() 会根据类型给默认值并记录日志(如流量 5%、压力 2%、温度 ±2℃ …)。

最后:

  • self.valself.intv 这两个 numpy 数组就准备好了(全是标准单位)。
  • Measured.init_md() 会把它们送进 Measured
    • self.S = diag((intv/1.96)^2) 作为协方差;
    • Partialer 记录 OFF/CST 等不可调整的测点索引;
    • 为变量名建 n2i 索引(冗余测点同名处理也在这里)。

顺带:Elements 可被 MeasureData.load_elems() 传入用来自动补全 description,并解析测点名里的设备/端口/参数,提升可读性,但不影响数值本身。

2) 未知量与方程的数值化

  • 链接模式下(你的 hope.py 就是这种):
    • Unknown.construct(bal_func) 绑定 u = bal_func(x)(你的 u_val(x))。
    • 每轮在 Measured.evaluate() 里更新 x 后,u.solve(self) 就会用 u_val(x) 重新“把未知量都算出来”,并用自动微分拿到 ∂u/∂xS_u = J_u S_x J_u^T
  • 方程Equation.mapping(x,u)f_val(x,u) 计算残差 f.val、雅可比 ∂f/∂x (及 ∂f/∂u)S_f = J_f S_x J_f^T(链接模式里 ∂f/∂x 会自动链式包含 ∂u/∂x)。

3) 数据协调(求解)

  • Measured.evaluate(f, u, link_mode=True) 中进行迭代:
    • 在“可调整测点子空间”里求 v 使残差加权最小(VDI-2048/卡方意义);
    • 每步更新 x = x_old - v;链接模式下再计算新的 u = u(x);映射新方程;
    • 直到步长小于 eps 或到达 max_iter
    • 统计输出:每步 eq_err(残差均方差)、total_penalty(卡方统计量)等。

4) 验证与修正

  • Equation.verify(x, u, chi2=...) 计算临界值 x.K_crt
    • 有未知量 → c2.ppf(0.95, m-p);无未知量 → fd.ppfc2.ppf
    • 返回 quality = total_penalty / K_crtquality < 1 认为整体无粗大误差
  • Measured.locate() / Equation.locate() 定位疑似粗大误差点;rectify() 应用修正;并再次 mapping()、写入新旧对比。

三、输出到了哪里?

1) 结构化数据文件(JSON / CSV)

Solver.save_file()

python
self.data_file.convert_unit() # 把标准单位结果按需求(逆)转换为报表/原始单位 self.data_file.save() # 写 JSON(完整的 x/u/f、统计量、曲线等) self.data_file.save_csv() # 写 CSV(表格友好)
  • 路径:默认在输入文件所在目录下新建 result_<输入文件名去后缀>;或者使用 --output 指定目录。
  • JSON 里通常包含:
    • x(已知测点修正后值、最终不确定度、卡方分量等)
    • u(未知量求解结果及不确定度)
    • f(方程残差、关闭项)
    • 全局统计量(初始/最终残差、总卡方、临界值、收敛曲线等)
  • CSV 则是面向人阅读/Excel 的平铺表。

注:虽然 DataFile 的实现你没贴,但从调用看,它会把数值统一回报告所需的单位(用到 unit_convert(..., reverse=True) 这种逻辑),再保存。

2) HTML 报告(可读可分享)

python
self.report.generate_report(self.data_file, mode='xuf', log_path=self.log_path) self.report.parse()
  • 结合 bundle 里的 .js/.css 静态资源生成交互式 HTML 报告(图表、表格、误差曲线、可疑点标注等)。
  • 路径与上面 JSON/CSV 同目录。

3) 日志文件

  • Solver.__init__ 里配置了 self.log_path = <output>/<proj>.log,所有 INFO/WARNING/ERROR 都会记录,便于复现与排错。

4)(可选)把未知量数值回写到它自己的 DataFrame

  • Unknown.solve() 有这一段:
    python
    if isinstance(self.var, Data): if self.new: self.var.data = self.var.data.assign(value=self.val, accuracy=self.intv) self.new = False else: self.var.data['value'] = self.val self.var.data['accuracy'] = self.intv
    也就是说,u 的结果也会回写到 UnmeasuredData.data,供后续导出/报告用。

四、把问题落在两句关键话上

  • 输入测点基本信息在哪里体现?
    --input 指定的 CSV 文件(或由 TagsData.trans() 生成的目标 CSV)里。MeasureData.load()/convert() 把它们变成标准单位self.val / self.intv 与协方差 self.S,并记录 OFF/CST/上下界等属性;Elements 仅用于文字描述解析。

  • 计算完成后输出到哪里?
    Solver.save_file() 将协调后的 x/u/f 及统计结果保存到 result 目录(JSON + CSV),同时生成 HTML 报告,并在 <proj>.log 中记录详细过程。链接模式下 u 的最终值也会回写到 UnmeasuredData.data 以供导出。


如果你给我一小段真实的 CO2-SS.csv(头 5–10 行即可),我可以把一次完整的单位转换与不确定度生成过程跑一遍,直观看到“原始列 → 标准单位的 val/intv/S → 输出回报告单位”的样子。

Domanda

CO2-SS_x.csv
CO2-SS_xE1.csv

这是输入文件,请解读一下上方的英文单词都是什么意思,以及上述两文件的输入区别在哪里?

Pensiero

Thought for 4 seconds

Pensiero

Explaining data differences

The differences between the files seem to be small, like different values and one row with a name commented out. Also, the C_O_P accuracy varies (5 vs 2). It looks like xE1 involves slightly different experimental data. I'll clarify column headers, type codes, and explain the file differences, including unit conversion and varying accuracies.

Thought for 17s

Risposta

太好了~我把你刚上传的两份输入表看了(我在上面给你各展示了前几行预览)。下面先把“表头的英文单词”逐个解释,然后对比 CO2-SS_x.csvCO2-SS_xE1.csv 两份输入的区别。

表头英文是什么意思

(这两份文件的列完全一致)

  • name:测点/变量名称(如 C_O_T1, T_I_P)。名称里常见后缀的含义:
    • _P 压力(Pressure)
    • _T 温度(Temperature)
    • _M 质量流量(Mass flow)
    • _H 比焓(Enthalpy)
    • _LOAD 负荷/功率(Load/Power)
    • _DP 压差(Delta Pressure)
    • _DT 温差(Delta Temperature)
    • _TTD 上端差、_DCA 下端差(换热器常用)
    • 这些后缀在 Elements.var_dict / TagsData.vtype() 里都有映射。
  • type:变量类型代码(决定单位与默认上下界),例如:
    • p 压力、t 温度、qm 质量流量、h 比焓、load/pw 负荷/功率、dp 压差、dt 温差、x 干度、a 面积、num 无量纲等。
    • 加载时用它来做单位换算与默认不确定度设置(见 MeasureData.convert() / _autoacc())。
  • description:文字描述(可为空;若加载了设备描述 Elements,会自动补更易读的中文说明)。
  • value:仪表测量值(原始单位,程序会统一换算到标准单位进行计算)。
  • vunit:value 的单位(如 MPa, kPa, K, C, kg/s, kW, …)。
  • accuracy:不确定度数值(可写绝对值,也可配合 aunit='%' 表示相对不确定度;CST 表示常数;OFF 表示该测点关闭)。
  • aunit:accuracy 的单位:
    • % 表示相对不确定度(按 value×百分比计算)。
    • C 表示温度的 ±℃。
    • 留空通常表示绝对值(与 vunit 同量纲)。
  • token:测点位号/外部系统的标签(用于冗余测点、对照等;为空也没关系)。
  • lb / ub:下界/上界(可留空;程序会按 type 自动补默认范围,如干度上界 1、压力/温度/流量的下界 0 等)。
  • dvalue:设计值(可选;如果测量值越界就用它来约化,否则用边界)。

这两份输入的“内容差异”

我做了逐格对比(同一行同一列对比),总结如下:

  • 结构:两份文件的列完全相同、行数也相同(都是 31 行)。
  • 多数差异是测量值的小幅调整value 列):例如
    • C_O_T1(压气机出口温度测点)在一份里是 229.2/229.2,另一份是 229.4/229.0;
    • C_I_M(压气机入口质量流)0.4042 改为 0.4062 或 0.4022;
    • FUEL_M(燃料流)0.0086 改为 0.0084 或 0.0088;
    • T_I_T(涡轮进口温度)1067.0 改为 1067.5/1066.6;
    • T_I_P(涡轮进口压力)0.457 改为 0.459/0.455;
    • T_O_T1(涡轮出口温度测点)714.3 改为 714.6/714.0;
    • T_I_M(涡轮进口流量)0.413 改为 0.418/0.408。
      这些变化看起来像不同实验/工况下的测量微调(或两次采样的差别)。
  • 有一处把“被注释的名字”恢复了
    • CO2-SS_x.csv 的某行 name# C_O_P(前面有 #,像是人为注释),而在 CO2-SS_xE1.csv 则是 C_O_P(去掉了 #),且该行 value 也略有不同(0.4661→0.4681)。
  • 有一处不确定度(accuracy)不同
    • C_O_Paccuracy 在一份是 '5',另一份是 '2'
      这会直接影响该测点的权重(不确定度越小,权重越大),进而影响数据协调时对它的“信任”程度与最终卡方。

以上差异我已经逐项比对,前 5 条差异如:

  • 第 4 行(从 0 开始计数):name# C_O_PC_O_Pvalue 0.4661 → 0.4681
  • 第 5 行:同名 C_O_Pvalue 0.4661 → 0.4641,accuracy 5 → 2
  • 第 6/7 行:C_O_T1value 小幅调整
    …(后续多处是 value 的细微变化)

这两份“输入文件”的本质区别

  • 不是格式/字段的区别,而是测量数据与不确定度的不同版本
    • CO2-SS_x.csv 可以看作基准数据;
    • CO2-SS_xE1.csv 是“实验/工况 E1”的一套观测值(多数值略有改动,且至少一个测点的精度提升:accuracy 5% → 2% 或绝对值 5 → 2)。
  • 这种差异会在数据协调中产生实际影响:
    • 数值变化会改变方程残差;
    • 不确定度变化会改变权重(协方差矩阵对角元),从而改变优化的“拉扯方向”,最终使修正后的测点值、卡方统计量、可疑粗大误差点排序略有不同。

放在哪一步被“读入与使用”

  • 读取:MeasureData.load() 把上面这些列读入为 DataFrame;MeasureData.convert()
    • 执行单位转换(按 typevunit);
    • 生成/修正不确定度(accuracy+aunit);
    • 处理 OFF/CST 与上下界;
    • 形成用于计算的 标准单位数组 valintv,并据此生成协方差矩阵 S
  • 使用:在 Measured.evaluate() 的迭代里,这些 val/S 与你写的方程 f_val(x,u) 一起进入“最小化残差”的优化过程;accuracy 越小的点权重越大。

如果你愿意,我可以把两份文件的差异清单导出成一个小表(哪一行、哪个字段不同、旧值→新值),放到同一目录,便于你审阅和归档。需要的话我现在就生成。

Domanda

def n2x(self):
return self.x.n2i
其中,n2i的定义在哪里?

Risposta

你问到的 n2i,其实就是在 Measured 类里 定义的。来看一下流程:


1. n2x 的来源

hope.py 里有:

python
n2x = solver.n2x

Solver 类里定义了:

python
def n2x(self): return self.x.n2i

这里的 self.xMeasured 类型的对象(在 Solver.create_var() 里构造)。


2. n2i 的定义位置

Measured 类(定义在 solver.py 里)中有个方法 init_md()

python
def init_md(self): md = self.md self.n = len(md) self.val, self.intv = md.val, md.intv self.names = md.name # 这里定义了 n2i: self.n2i = {md.name[i]: i for i in range(len(md))}

也就是说:

  • n2i 是一个字典,键是测点名称(md.name[i]),值是它在数组里的索引位置(i)。
  • 作用:让你可以通过测点名快速查到它在 valintv 数组里的下标。

例如:

python
n2x = solver.n2x print(n2x['C_O_T1'])

可能会输出 6,表示 C_O_T1 这个测点在内部数组的第 6 个位置。


✅ 总结:

  • n2x() 方法其实就是暴露了 Measured.n2i
  • n2iMeasured.init_md() 中定义,是 测点名 → 索引 的映射字典。
  • 它的目的是方便把 CSV 表里的人类可读的测点名称,快速定位到 valintv 数组中的对应下标。

要不要我帮你用你上传的 CO2-SS_x.csv 文件跑一遍,把 n2i 生成出来,列出 测点名和它对应的数组索引?这样你就能直观地看到字典内容了。

Domanda

text
if args.trans is not None: data_template(args.trans, template='CO2-SS_data_template.csv') init_time = datetime.now() solver = Solver(proj_name=args.proj, input_name=args.input, desc=args.desc, timer=init_time, output=args.output) n2x = solver.n2x # 获取索引变量 n2u = solver.n2u # 创建辅助变量集 X = VariableManager(n2x, n2u)

辅助变量集是用来干什么的?

Risposta

好问题 👍
你这里的 辅助变量集 X = VariableManager(n2x, n2u),其实就是一个“桥梁工具”,让后续写热平衡函数 u_val(x) 和方程函数 f_val(x,u) 更直观、更方便。


1. 背景

  • 在数据协调程序里,x 代表测量量(Measured),u 代表未测量的未知量(Unmeasured)。
  • 它们在程序内部是以数组形式存储的,访问时需要通过索引:
    • 比如 n2x['C_I_T'] = 3 表示 “压气机入口温度” 在数组 x 的第 3 个位置。
    • 如果你直接写计算公式,就得经常 x[3], x[7],非常不直观。

2. VariableManager 的作用

VariableManager(n2x, n2u) 接收两个映射:

  • n2x:测点名 → 在数组 x 中的索引
  • n2u:未知量名 → 在数组 u 中的索引

然后它提供一种更友好的写法:

python
X.init(x, u) X['C_I_T'] # 自动返回数组中“C_I_T”位置的值 X['C_O_H'] # 自动返回压气机出口焓 X['GEN_LOAD'] # 自动返回发电机负荷

同时它也支持赋值:

python
X['C_O_H'] = air_comp_h1(X['C_I_H'], X['C_I_P'], X['C_O_P'], X['C_EFF'])

这行的意义是:计算压气机出口焓,并直接写回到 x 数组对应位置。


3. 为什么叫“辅助变量集”

  • 它不是新的物理量集合,而是对 x/u 数组的封装
  • 作用是让你在写 热平衡函数 u_val(x)冗余平衡方程 f_val(x,u) 时,像写公式一样用名字读写,不需要手动查索引。
  • 这样代码既直观(X['C_I_T'] 一看就是压气机入口温度),又不容易写错。

4. 小例子

如果没有 VariableManager,你可能要这样写:

python
C_I_H = air_h_tp(x[n2x['C_I_T']], x[n2x['C_I_P']])

用了 VariableManager 之后,只需要:

python
X['C_I_H'] = air_h_tp(X['C_I_T'], X['C_I_P'])

看起来就像公式推导一样。


✅ 总结一句话:
辅助变量集 X 就是一个变量管理器,用来把底层的数组存取操作封装成“用名字访问变量”。它让热力学平衡函数和方程函数的编写更直观,避免频繁查索引。


要不要我帮你画一个简单的示意图,展示 “x 数组 ←→ n2x ←→ VariableManager ←→ X['C_I_T']” 的关系?这样你能一眼看出这个管理器在中间扮演的角色。

Domanda

需要你帮我画一个简单的示意图,展示他们的关系

Python

python
import matplotlib.pyplot as plt # 画一个简单的示意图展示关系 fig, ax = plt.subplots(figsize=(8, 6)) # 定义节点 nodes = { "CSV 文件\n(CO2-SS_x.csv)": (0, 2), "MeasureData\n(读取CSV)": (3, 2), "Measured\n(对象self.x)": (6, 2), "n2i\n{name→index}": (9, 2), "Solver.n2x()": (12, 2), "VariableManager\n(X)": (6, 0), "访问示例:\nX['C_I_T'] ↔ x[n2i['C_I_T']]": (6, -2) } # 画节点 for name, (x, y) in nodes.items(): ax.text(x, y, name, ha="center", va="center", bbox=dict(boxstyle="round,pad=0.4", fc="lightblue", ec="black", lw=1)) # 画箭头 arrows = [ ("CSV 文件\n(CO2-SS_x.csv)", "MeasureData\n(读取CSV)"), ("MeasureData\n(读取CSV)", "Measured\n(对象self.x)"), ("Measured\n(对象self.x)", "n2i\n{name→index}"), ("n2i\n{name→index}", "Solver.n2x()"), ("Solver.n2x()", "VariableManager\n(X)"), ("VariableManager\n(X)", "访问示例:\nX['C_I_T'] ↔ x[n2i['C_I_T']]") ] for start, end in arrows: x1, y1 = nodes[start] x2, y2 = nodes[end] ax.annotate("", xy=(x2, y2+0.3 if y2<y1 else y2-0.3), xytext=(x1, y1-0.3 if y2<y1 else y1+0.3), arrowprops=dict(arrowstyle="->", lw=1.5)) ax.set_xlim(-1, 14) ax.set_ylim(-3, 3.5) ax.axis("off") plt.show()

Risposta

这里给你画了一张简图,展示了输入数据(CSV 文件)如何一步步变成你可以用 X['C_I_T'] 直接访问的“辅助变量集”:

  1. CSV 文件(如 CO2-SS_x.csv
    ⬇ 读入
  2. MeasureData(解析 CSV)
    ⬇ 转换成内部数组
  3. Measured (self.x)(持有测点数组与名称)
    ⬇ 生成
  4. n2i {name→index}(测点名到数组下标的映射)
    ⬇ 暴露
  5. Solver.n2x()(返回 n2i)
    ⬇ 注入
  6. VariableManager(X)(变量管理器)
    ⬇ 使用时
  7. X['C_I_T'] ↔ x[n2i['C_I_T']](直观的访问方式)

也就是说:VariableManager 就是封装了 n2i 的接口,让你能用名字而不是数字下标去访问/写入数组中的值。

要不要我再帮你把 f_val 和 u_val 的计算过程拿出来,用这张图里的数据流解释它们是怎么依赖 X 的?

Domanda

类似于X['C_I_T']这样的表达,返回的是C_I_T的value还是C_I_T所在的位置?如果返回的是value,是在哪部分代码中存在读取对应csv文件的value值的?

Risposta

好问题 👍,我来帮你把这个链条捋清楚。


1. X['C_I_T'] 返回什么?

  • 返回的是 C_I_T 的数值 (value),而不是位置索引。
  • 位置索引 的信息在 n2x / n2u 字典里已经有了,但 VariableManager 的设计就是为了屏蔽索引,让你用名字直接拿到值。

2. value 是从哪里来的?

流程是这样的:

  1. CSV 输入文件(例如 CO2-SS_x.csv)里有:

    • name(比如 C_I_T
    • value(原始数值,比如 298.0)
    • vunit(单位,比如 K
    • accuracy(不确定度)等等。
  2. MeasureData.load()

    • 读入 CSV,形成一个 pandas.DataFrame
  3. MeasureData.convert()

    • 把原始 value 按照 vunit 转换到标准单位(例如温度统一成 K,压力统一成 MPa)。
    • 得到数组 self.val(测点值)和 self.intv(不确定度)。
    • 如果越界或 OFF,还会修正。
  4. Measured.init_md()

    • MeasureData 里取出 valintvname
    • 构造一个字典 n2i = {name: i},表示每个测点对应的数组下标。
    • 此时你就有了:名字 → 下标,名字 → 值
  5. VariableManager.init(x, u)

    • 接收 x(已知量数组)、u(未知量数组)。
    • 内部做了一层封装,允许你写 X['C_I_T'],它会:
      • n2x['C_I_T'] 找到下标 i
      • 再去 x[i] 拿值并返回。

3. 举个具体例子

假设 CSV 里有一行:

nametypevaluevunitaccuracy
C_I_Tt25.0C2.0

流程:

  1. MeasureData.convert() 把 25.0 ℃ 转换成 298.15 K,放到 val 数组。
    val = [ ..., 298.15, ... ]
    → 同时 accuracy 转换成 ±2K,放到 intv
  2. Measured.init_md()C_I_T 映射到下标 3:
    n2i['C_I_T'] = 3
  3. VariableManager.init(x) 接收这个数组。
  4. 你调用 X['C_I_T']
    → 查到索引 3
    → 返回 x[3] = 298.15

所以结果就是 值(value,经过单位转换后的标准单位)


✅ 总结一句话:
X['C_I_T'] 返回的是“压气机入口温度的数值(已换算成标准单位)”,这些数值最初来自输入 CSV 的 value 列,在 MeasureData.convert() 里完成单位换算和校验。


要不要我帮你用你上传的 CO2-SS_x.csv 实际跑一遍,把几个关键测点(比如 C_I_T, C_O_P, T_I_T)的 CSV 原始值 → 转换后内部值 → X[...] 访问结果 列个表出来?这样你会更直观地看到全过程。

Domanda

  1. Measured.init_md()C_I_T 映射到下标 3:
    n2i['C_I_T'] = 3
    为什么是3?Python从0开始计数的话,value应该是第2列吧

Risposta

啊,这里有点容易搞混 🤓。我来仔细解释一下为什么是 3 而不是 “第 2 列”。


1. 下标 n2i 的意义

Measured.init_md() 里:

python
self.n2i = {md.name[i]: i for i in range(len(md))}
  • 这里的 i行号(索引),来自 MeasureDatamd.name 的列表。
  • len(md) 表示测点的总数(行数),所以 i0 开始 递增。

举例:如果 CSV 文件里(忽略掉表头)前三行的 name 分别是:

C_I_T, C_I_P, C_O_T

那么生成的映射就是:

python
n2i['C_I_T'] = 0 n2i['C_I_P'] = 1 n2i['C_O_T'] = 2

如果 第四行才是 C_O_P,那么:

python
n2i['C_O_P'] = 3

2. 为什么你会觉得是“第 2 列”

  • CSV 文件是一个 表格,有多列:name, type, description, value, vunit, …
  • n2i 并不是对 “列” 编号,而是对 编号。
  • 也就是说:n2i 关心的是“第几个测点”,而不是“在 CSV 的第几列”。

3. 总结

  • n2i 的下标是“行号”,不是列号。
  • 之所以 C_I_T 的下标是 3,是因为它在 CSV 的 第 4 行(Python 从 0 数)。
  • value 列只是这一行里的一个属性,真正的数值会被 MeasureData.convert() 取出来放进数组 val
  • VariableManagern2i 找到下标,再到 val 里取数。

✅ 换句话说:
n2i['C_I_T'] = 3 的意思是 “C_I_T 这个测点在所有测点数组里的第 4 个位置”,不是“第 2 列”


要不要我帮你直接用你上传的 CO2-SS_x.csv 文件跑一下,把前 10 个测点的 name 和它在 n2i 里的索引打印出来?这样你能一眼看到 “第几行 → 下标几”。

Condividi questa Q&A