# -*- coding: utf-8 -*- import os import re import...

Creato il: 26 marzo 2025

Risposto usando O1 da Chat01

Domanda

-- coding: utf-8 --

import os
import re
import json
import gc
import langid
import wandb
import torch
import deepspeed
from typing import List
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk import edit_distance
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from trl import GRPOConfig, GRPOTrainer
from nltk.translate.meteor_score import meteor_score

1. Login to wandb if desired

wandb.login(key="91223cd0edc96e39bfa8e98641d2a8d1aa3c0c4e")

2. Device assignment; let DeepSpeed handle device_map

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on device: {device}")

3. Supported languages

SUPPORTED_LANGUAGES = {
"en_to_zh": ("英语", "中文"),
"zh_to_en": ("中文", "英语"),
}

4. System Prompt (modified as requested)

SYSTEM_PROMPT = """
You are a lyrics translation assistant.
You MUST ALWAYS output in the exact XML format:
<why1>
[First question you ask yourself to complete the task]
</why1>
<why2>
[Second question you ask yourself to complete the task]
</why2>
<why3>
[Third question you ask yourself to complete the task]
</why3>
<answer>
[The final translation only goes here]
</answer>
"""

5. Dataset Preparation

def get_lyric_datasets(path: str) -> Dataset:
"""
将指定路径的 JSON 数据集转换为 HuggingFace Dataset,并为每条数据
生成 prompt: [system, user] 及其真实翻译。
"""
data = Dataset.from_json(path)

text
# Filter dataset to only include 'en_to_zh' and 'zh_to_en' data = data.filter(lambda x: x['type'] in ["en_to_zh", "zh_to_en"]) def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] # 将新的 SYSTEM_PROMPT 拼接上提示 system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data

6. Utility to extract <answer> ... </answer> from text

def extract_xml_answer(text: str) -> str:
"""
从给定文本中提取 <answer> ... </answer> 内容。
"""
pattern = r"<answer>\s*(.?)\s</answer>"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""

7. Reward computation helpers

def compute_length_acc(
preds: List[str],
refs: List[str],
tokenizer,
max_tolerance: float = 0.5
) -> List[float]:
"""
计算长度准确度奖励。
如果预测长度与参考长度之差比不大于一定比例,则给予相应的分数。
"""
rewards = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
rewards.append(0.0)
continue
length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens)
if length_ratio <= 0.1:
score = 1.0
elif length_ratio <= 0.2:
score = 0.8
elif length_ratio <= 0.3:
score = 0.6
elif length_ratio <= 0.4:
score = 0.4
elif length_ratio <= 0.5:
score = 0.2
else:
score = 0.0
rewards.append(score)
return rewards

def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]:
"""
计算 BLEU 分数列表
"""
smoothie = SmoothingFunction().method1
weights = (0.25, 0.25, 0.25, 0.25)
scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if not pred_tokens or not ref_tokens:
scores.append(0.0)
continue
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
weights=weights,
smoothing_function=smoothie
)
scores.append(bleu)
return scores

def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]:
"""
计算 TER (Translation Edit Rate)。
"""
ter_scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
if len(pred_tokens) > 0:
ter_scores.append(100.0)
else:
ter_scores.append(0.0)
continue
dist_val = edit_distance(pred_tokens, ref_tokens)
ter = (dist_val / len(ref_tokens)) * 100
ter_scores.append(ter)
return ter_scores

def detect_language(text: str) -> str:
"""
使用 langid 自动检测语言,返回语言代码。
"""
return langid.classify(text)[0]

8. Reward Function Decorator (示例,本项目暂不使用)

def reward_func_decorator(func):
def wrapper(prompts, completions, answer, step=0, **kwargs):
# 可以在这里对 func 做一些预处理或后处理
pass

return wrapper

9. Define Reward Functions

def length_acc_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]:
"""示例奖励函数:关注预测的长度。"""
responses = [completion[0]['content'] for completion in completions]
extracted_responses = [extract_xml_answer(r) for r in responses]

text
# 打印一些示例数据 if step % 200 == 0: print(f"\n[Step {step}] Displaying up to 3 samples from the batch:\n") for i in range(min(3, len(responses))): q = prompts[i][-1]['content'] print("-" * 20) print(f"Sample {i + 1}") print(f"Question:\n{q}") print(f"GroundTruth Answer:\n{answer[i]}") print(f"Model Response (raw):\n{responses[i]}") print(f"Extracted <answer>:\n{extracted_responses[i]}") print("-" * 20) length_rewards = compute_length_acc( preds=extracted_responses, refs=answer, tokenizer=tokenizer ) return length_rewards

def bleu_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]:
"""
计算 BLEU 分数,并转换为某种自定义奖励区间。
"""
responses = [c[0]["content"] for c in completions]
q = prompts[0][-1]['content']
extracted = [extract_xml_answer(r) for r in responses]
print('-' * 20, f"Original Lyrics:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}",
f"\nExtracted:\n{extracted[0]}")
bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for score in bleu_scores:
if score >= 0.9:
rewards.append(5.0)
elif score >= 0.8:
rewards.append(4.5)
elif score >= 0.7:
rewards.append(4.0)
elif score >= 0.6:
rewards.append(3.5)
elif score >= 0.5:
rewards.append(2.5)
elif score >= 0.4:
rewards.append(2.0)
elif score >= 0.3:
rewards.append(1.5)
elif score >= 0.2:
rewards.append(1.0)
elif score >= 0.1:
rewards.append(0.5)
else:
rewards.append(0.0)
return rewards

def ter_reward_func(completions, answer, step=0, **kwargs) -> List[float]:
"""
计算 TER 分数,并映射到某种奖励区间。
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for t in ter_scores:
if t >= 80:
rewards.append(0.0)
elif t >= 60:
rewards.append(0.5)
elif t >= 40:
rewards.append(1.0)
elif t >= 20:
rewards.append(1.5)
else:
rewards.append(2.0)
return rewards

def language_recognition(completions, answer, step=0, **kwargs) -> List[float]:
"""
简单地检测 预测语种 是否与 参考答案语种 相同,以此给予奖励。
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
rewards = []
for pred, ref in zip(extracted, answer):
if not pred.strip():
rewards.append(0.0)
continue
pred_lang = detect_language(pred)
ref_lang = detect_language(ref)
rewards.append(1.0 if pred_lang == ref_lang else 0.0)
return rewards

def strict_format_reward_func(completions, answer=None, step=0, kwargs) -> list:
"""
对输出做严格格式检查:
必须包含 <why1>...</why1> <why2>...</why2> <why3>...</why3> <answer>...</answer> 且顺序正确。
"""
pattern = (
r"<why1>[\s\S]+?</why1>\s
"
r"<why2>[\s\S]+?</why2>\s
"
r"<why3>[\s\S]+?</why3>\s*"
r"<answer>[\s\S]+?</answer>"
)
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
if re.search(pattern, r):
scores.append(1.0)
else:
scores.append(0.0)
return scores

def soft_format_reward_func(completions, answer=None, step=0, **kwargs) -> list:
"""
对输出做软格式检查:
只要包含所有四组标签即可,无论顺序或中间有无其他内容,都给一定的分数。
"""
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
tags_present = all(
tag in r
for tag in ["<why1>", "</why1>", "<why2>", "</why2>", "<why3>", "</why3>", "<answer>", "</answer>"]
)
scores.append(0.5 if tags_present else 0.0)
return scores

def xmlcount_reward_func(completions, answer=None, step=0, **kwargs) -> List[float]:
"""
对XML标签出现次数进行计分,每出现一个正确的起始和结束标签加分。
如果 </answer> 后面还有残余文字,则额外扣分。
"""

text
def count_xml(text) -> float: count = 0.0 # 针对新的四段式标签:why1、why2、why3、answer,每个起止标签 0.125 分 # 总分最高1.0 if text.count("<why1>") == 1: count += 0.125 if text.count("</why1>") == 1: count += 0.125 if text.count("<why2>") == 1: count += 0.125 if text.count("</why2>") == 1: count += 0.125 if text.count("<why3>") == 1: count += 0.125 if text.count("</why3>") == 1: count += 0.125 if text.count("<answer>") == 1: count += 0.125 if text.count("</answer>") == 1: count += 0.125 # 如果 </answer> 后还有多余文本,稍微给个负分 if "</answer>" in text: leftover = text.split("</answer>")[-1] count -= len(leftover.strip()) * 0.001 # 夹在 0 到 1 之间,不要出现负分 if count < 0: count = 0.0 return count responses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses]

def meteor_reward_func(completions, answer, step=0, tokenizer=None, **kwargs) -> List[float]:
"""
2) 基于 METEOR 分数的语义相似度
- 使用 nltk.translate.meteor_score.meteor_score 计算分值
- 再简单映射到 [0, 2.0] 区间
"""
responses = [c[0]["content"] for c in completions]
extracted_preds = [extract_xml_answer(r) for r in responses]

text
scores = [] for pred, ref in zip(extracted_preds, answer): if not pred.strip(): scores.append(0.0) continue m = meteor_score([ref], pred) # meteor_score 参数: (参考列表, 预测字符串) # 映射区间,这里只是示例,你可根据需要调节 if m >= 0.9: scores.append(2.0) elif m >= 0.75: scores.append(1.5) elif m >= 0.5: scores.append(1.0) elif m >= 0.3: scores.append(0.5) else: scores.append(0.0) return scores

10. Training Arguments for GRPO

training_args = GRPOConfig(
use_vllm=False,
learning_rate=5e-6,
adam_beta1=0.9,
adam_beta2=0.99,
weight_decay=0.1,
warmup_ratio=0.1,
lr_scheduler_type="cosine",
logging_steps=50,
fp16=True,
per_device_train_batch_size=4,
gradient_checkpointing=False,
gradient_accumulation_steps=2,
num_generations=8,
max_prompt_length=768,
max_completion_length=768,
num_train_epochs=7, # We'll manually loop over epochs below
# max_steps=2,
save_steps=500,
max_grad_norm=0.1,
report_to='none',
output_dir="outputs",
# 指定 DeepSpeed config
deepspeed="ds_config.json"
)

11. Load Model/Tokenizer

model_name = "../model/Qwen2.5-3B-Instruct" # Adjust path as needed
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map=None, # Let DeepSpeed handle it
use_cache=False
)

model.config.use_cache = True

model.to(device)

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

12. Load Dataset

dataset = get_lyric_datasets("../data_pack/multi_lyric.json")

--------------------------- 修改:划分训练集和测试集 (9 : 1) ---------------------------

train_test = dataset.train_test_split(test_size=0.1, seed=42)
train_dataset = train_test["train"]
test_dataset = train_test["test"]

13. Initialize Trainer (用训练集)

trainer = GRPOTrainer(
model=model,
processing_class=tokenizer,
reward_funcs=[
xmlcount_reward_func,
soft_format_reward_func,
strict_format_reward_func,
bleu_reward_func
],
args=training_args,
train_dataset=train_dataset,
)

14. Clean up CUDA before training

torch.cuda.empty_cache()

gc.collect()

15. 定义测试集评估函数(with torch.no_grad())

def evaluate_model(
model,
tokenizer,
dataset: Dataset,
batch_size: int = 2,
max_new_tokens: int = 512
):
"""
简易测试函数:逐条生成翻译结果,提取 <answer>,计算 BLEU、TER、和 length_acc。
根据需要可自行改动推理参数(如温度、max_new_tokens 等)。
"""
from statistics import mean

text
all_preds = [] all_refs = [] model.eval() with torch.no_grad(): # 正确处理Dataset对象 for i in range(0, len(dataset), batch_size): # 获取数据并明确转换为列表 batch_indices = list(range(i, min(i + batch_size, len(dataset)))) batch = dataset.select(batch_indices) prompts = [item["prompt"] for item in batch] refs = [item["answer"] for item in batch] inputs = [] for p in prompts: # p 是 list[{'role': ..., 'content': ...}, ...] # 这里简单拼接 system + user 的 content role_content = [pc["content"] for pc in p] joined = "\n".join(role_content) inputs.append(joined) encodings = tokenizer(inputs, return_tensors="pt", padding=True, truncation=True).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # 用 greedy ) for o in outputs: text = tokenizer.decode(o, skip_special_tokens=True) pred_ans = extract_xml_answer(text) all_preds.append(pred_ans) print(f"Sample {i}:") print(f"Generated: {pred_ans}") print(f"Reference: {refs}") print("-" * 30) all_refs.extend(refs) bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics

16. 手动逐 epoch 训练与测试

num_epochs = int(training_args.num_train_epochs)
for epoch in range(num_epochs):
print(f"======= Starting Epoch {epoch + 1}/{num_epochs} =======")
trainer.train()

评估

print("\nStarting evaluation...")
try:
test_metrics = evaluate_model(model, tokenizer, test_dataset)
print(f"\n===== Evaluation at Epoch {epoch + 1} =====")
print(f"Test BLEU: {test_metrics['bleu']:.4f}")
print(f"Test TER: {test_metrics['ter']:.4f}")
print(f"Test Length Acc: {test_metrics['length_acc']:.4f}")
print("==========================================")
except Exception as e:
print(f"Error during evaluation: {e}")
import traceback

traceback.print_exc()

注意evaluate_model函数在计算与ref data(true label)的指标时,要确保system prompt和user input被排除/删除,仅保留ai输出的内容中<answer></answer>标签内的内容来进行计算;且在评估时每十条就打印输出一次:模型生成结果,模型生成内容的标签(如: if i % 10 == 0: # 每 10 个样本打印一次
print(f"Sample {i}:")
print(f"Generated (assistant response): {generated_text}")
print(f"Extracted <answer>: {extracted_answer}")
print(f"Reference: {true_answer}")
print("-" * 30))修改上述函数,给出完整的代码

Risposta

python
# -*- coding: utf-8 -*- import os import re import json import gc import langid import wandb import torch import deepspeed from typing import List from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction from nltk import edit_distance from datasets import load_dataset, Dataset from transformers import AutoTokenizer, AutoModelForCausalLM from trl import GRPOConfig, GRPOTrainer from nltk.translate.meteor_score import meteor_score # 1. Login to wandb if desired # wandb.login(key="91223cd0edc96e39bfa8e98641d2a8d1aa3c0c4e") # 2. Device assignment; let DeepSpeed handle device_map device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Running on device: {device}") # 3. Supported languages SUPPORTED_LANGUAGES = { "en_to_zh": ("英语", "中文"), "zh_to_en": ("中文", "英语"), } # 4. System Prompt (modified as requested) SYSTEM_PROMPT = """\ You are a lyrics translation assistant. You MUST ALWAYS output in the exact XML format: <why1> [First question you ask yourself to complete the task] </why1> <why2> [Second question you ask yourself to complete the task] </why2> <why3> [Third question you ask yourself to complete the task] </why3> <answer> [The final translation only goes here] </answer> """ # 5. Dataset Preparation def get_lyric_datasets(path: str) -> Dataset: """ 将指定路径的 JSON 数据集转换为 HuggingFace Dataset,并为每条数据 生成 prompt: [system, user] 及其真实翻译。 """ data = Dataset.from_json(path) # Filter dataset to only include 'en_to_zh' and 'zh_to_en' data = data.filter(lambda x: x['type'] in ["en_to_zh", "zh_to_en"]) def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] # 将新的 SYSTEM_PROMPT 拼接上提示 system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data # 6. Utility to extract <answer> ... </answer> from text def extract_xml_answer(text: str) -> str: """ 从给定文本中提取 <answer> ... </answer> 内容。 """ pattern = r"<answer>\s*(.*?)\s*</answer>" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: return match.group(1).strip() return "" # 7. Reward computation helpers def compute_length_acc( preds: List[str], refs: List[str], tokenizer, max_tolerance: float = 0.5 ) -> List[float]: """ 计算长度准确度奖励。 如果预测长度与参考长度之差比不大于一定比例,则给予相应的分数。 """ rewards = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: rewards.append(0.0) continue length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens) if length_ratio <= 0.1: score = 1.0 elif length_ratio <= 0.2: score = 0.8 elif length_ratio <= 0.3: score = 0.6 elif length_ratio <= 0.4: score = 0.4 elif length_ratio <= 0.5: score = 0.2 else: score = 0.0 rewards.append(score) return rewards def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]: """ 计算 BLEU 分数列表 """ smoothie = SmoothingFunction().method1 weights = (0.25, 0.25, 0.25, 0.25) scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if not pred_tokens or not ref_tokens: scores.append(0.0) continue bleu = sentence_bleu( [ref_tokens], pred_tokens, weights=weights, smoothing_function=smoothie ) scores.append(bleu) return scores def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]: """ 计算 TER (Translation Edit Rate)。 """ ter_scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: if len(pred_tokens) > 0: ter_scores.append(100.0) else: ter_scores.append(0.0) continue dist_val = edit_distance(pred_tokens, ref_tokens) ter = (dist_val / len(ref_tokens)) * 100 ter_scores.append(ter) return ter_scores def detect_language(text: str) -> str: """ 使用 langid 自动检测语言,返回语言代码。 """ return langid.classify(text)[0] # 8. Reward Function Decorator (示例,本项目暂不使用) def reward_func_decorator(func): def wrapper(prompts, completions, answer, step=0, **kwargs): # 可以在这里对 func 做一些预处理或后处理 pass return wrapper # 9. Define Reward Functions def length_acc_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]: """示例奖励函数:关注预测的长度。""" responses = [completion[0]['content'] for completion in completions] extracted_responses = [extract_xml_answer(r) for r in responses] # 打印一些示例数据 if step % 200 == 0: print(f"\n[Step {step}] Displaying up to 3 samples from the batch:\n") for i in range(min(3, len(responses))): q = prompts[i][-1]['content'] print("-" * 20) print(f"Sample {i + 1}") print(f"Question:\n{q}") print(f"GroundTruth Answer:\n{answer[i]}") print(f"Model Response (raw):\n{responses[i]}") print(f"Extracted <answer>:\n{extracted_responses[i]}") print("-" * 20) length_rewards = compute_length_acc( preds=extracted_responses, refs=answer, tokenizer=tokenizer ) return length_rewards def bleu_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]: """ 计算 BLEU 分数,并转换为某种自定义奖励区间。 """ responses = [c[0]["content"] for c in completions] q = prompts[0][-1]['content'] extracted = [extract_xml_answer(r) for r in responses] print('-' * 20, f"Original Lyrics:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted[0]}") bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer) rewards = [] for score in bleu_scores: if score >= 0.9: rewards.append(5.0) elif score >= 0.8: rewards.append(4.5) elif score >= 0.7: rewards.append(4.0) elif score >= 0.6: rewards.append(3.5) elif score >= 0.5: rewards.append(2.5) elif score >= 0.4: rewards.append(2.0) elif score >= 0.3: rewards.append(1.5) elif score >= 0.2: rewards.append(1.0) elif score >= 0.1: rewards.append(0.5) else: rewards.append(0.0) return rewards def ter_reward_func(completions, answer, step=0, **kwargs) -> List[float]: """ 计算 TER 分数,并映射到某种奖励区间。 """ responses = [c[0]["content"] for c in completions] extracted = [extract_xml_answer(r) for r in responses] ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer) rewards = [] for t in ter_scores: if t >= 80: rewards.append(0.0) elif t >= 60: rewards.append(0.5) elif t >= 40: rewards.append(1.0) elif t >= 20: rewards.append(1.5) else: rewards.append(2.0) return rewards def language_recognition(completions, answer, step=0, **kwargs) -> List[float]: """ 简单地检测 预测语种 是否与 参考答案语种 相同,以此给予奖励。 """ responses = [c[0]["content"] for c in completions] extracted = [extract_xml_answer(r) for r in responses] rewards = [] for pred, ref in zip(extracted, answer): if not pred.strip(): rewards.append(0.0) continue pred_lang = detect_language(pred) ref_lang = detect_language(ref) rewards.append(1.0 if pred_lang == ref_lang else 0.0) return rewards def strict_format_reward_func(completions, answer=None, step=0, **kwargs) -> list: """ 对输出做严格格式检查: 必须包含 <why1>...</why1> <why2>...</why2> <why3>...</why3> <answer>...</answer> 且顺序正确。 """ pattern = ( r"<why1>[\s\S]+?</why1>\s*" r"<why2>[\s\S]+?</why2>\s*" r"<why3>[\s\S]+?</why3>\s*" r"<answer>[\s\S]+?</answer>" ) responses = [completion[0]["content"] for completion in completions] scores = [] for r in responses: if re.search(pattern, r): scores.append(1.0) else: scores.append(0.0) return scores def soft_format_reward_func(completions, answer=None, step=0, **kwargs) -> list: """ 对输出做软格式检查: 只要包含所有四组标签即可,无论顺序或中间有无其他内容,都给一定的分数。 """ responses = [completion[0]["content"] for completion in completions] scores = [] for r in responses: tags_present = all( tag in r for tag in ["<why1>", "</why1>", "<why2>", "</why2>", "<why3>", "</why3>", "<answer>", "</answer>"] ) scores.append(0.5 if tags_present else 0.0) return scores def xmlcount_reward_func(completions, answer=None, step=0, **kwargs) -> List[float]: """ 对XML标签出现次数进行计分,每出现一个正确的起始和结束标签加分。 如果 </answer> 后面还有残余文字,则额外扣分。 """ def count_xml(text) -> float: count = 0.0 # 针对新的四段式标签:why1、why2、why3、answer,每个起止标签 0.125 分 # 总分最高1.0 if text.count("<why1>") == 1: count += 0.125 if text.count("</why1>") == 1: count += 0.125 if text.count("<why2>") == 1: count += 0.125 if text.count("</why2>") == 1: count += 0.125 if text.count("<why3>") == 1: count += 0.125 if text.count("</why3>") == 1: count += 0.125 if text.count("<answer>") == 1: count += 0.125 if text.count("</answer>") == 1: count += 0.125 # 如果 </answer> 后还有多余文本,稍微给个负分 if "</answer>" in text: leftover = text.split("</answer>")[-1] count -= len(leftover.strip()) * 0.001 # 夹在 0 到 1 之间,不要出现负分 if count < 0: count = 0.0 return count responses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses] def meteor_reward_func(completions, answer, step=0, tokenizer=None, **kwargs) -> List[float]: """ 2) 基于 METEOR 分数的语义相似度 - 使用 nltk.translate.meteor_score.meteor_score 计算分值 - 再简单映射到 [0, 2.0] 区间 """ responses = [c[0]["content"] for c in completions] extracted_preds = [extract_xml_answer(r) for r in responses] scores = [] for pred, ref in zip(extracted_preds, answer): if not pred.strip(): scores.append(0.0) continue m = meteor_score([ref], pred) # meteor_score 参数: (参考列表, 预测字符串) # 映射区间,这里只是示例,你可根据需要调节 if m >= 0.9: scores.append(2.0) elif m >= 0.75: scores.append(1.5) elif m >= 0.5: scores.append(1.0) elif m >= 0.3: scores.append(0.5) else: scores.append(0.0) return scores # 10. Training Arguments for GRPO training_args = GRPOConfig( use_vllm=False, learning_rate=5e-6, adam_beta1=0.9, adam_beta2=0.99, weight_decay=0.1, warmup_ratio=0.1, lr_scheduler_type="cosine", logging_steps=50, fp16=True, per_device_train_batch_size=4, gradient_checkpointing=False, gradient_accumulation_steps=2, num_generations=8, max_prompt_length=768, max_completion_length=768, num_train_epochs=7, # We'll manually loop over epochs below # max_steps=2, save_steps=500, max_grad_norm=0.1, report_to='none', output_dir="outputs", # 指定 DeepSpeed config deepspeed="ds_config.json" ) # 11. Load Model/Tokenizer model_name = "../model/Qwen2.5-3B-Instruct" # Adjust path as needed model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, device_map=None, # Let DeepSpeed handle it use_cache=False ) model.to(device) tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer.pad_token = tokenizer.eos_token # 12. Load Dataset dataset = get_lyric_datasets("../data_pack/multi_lyric.json") # --------------------------- 修改:划分训练集和测试集 (9 : 1) --------------------------- train_test = dataset.train_test_split(test_size=0.1, seed=42) train_dataset = train_test["train"] test_dataset = train_test["test"] # 13. Initialize Trainer (用训练集) trainer = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=[ xmlcount_reward_func, soft_format_reward_func, strict_format_reward_func, bleu_reward_func ], args=training_args, train_dataset=train_dataset, ) # 14. Clean up CUDA before training # torch.cuda.empty_cache() # gc.collect() # 15. 定义测试集评估函数(with torch.no_grad()) def evaluate_model( model, tokenizer, dataset: Dataset, batch_size: int = 2, max_new_tokens: int = 512 ): """ 简易测试函数:逐条生成翻译结果,提取 <answer>,计算 BLEU、TER、和 length_acc。 — 在评估时,仅保留模型输出中 <answer> ... </answer> 内的内容进行计算。 — 每 10 条结果打印一次。 """ from statistics import mean all_preds = [] all_refs = [] model.eval() with torch.no_grad(): for i in range(0, len(dataset), batch_size): batch_indices = list(range(i, min(i + batch_size, len(dataset)))) batch = dataset.select(batch_indices) # 参考答案 refs = [item["answer"] for item in batch] # 拼接 system+user prompt,但这里不会计入最终评估 # 因为我们只从模型生成的文本里提取 <answer> 进行度量 prompts = [item["prompt"] for item in batch] inputs = [] for p in prompts: # p 是 list[{'role': ..., 'content': ...}, ...] # 这里简单拼接 system + user 的 content role_content = [pc["content"] for pc in p] joined = "\n".join(role_content) inputs.append(joined) encodings = tokenizer( inputs, return_tensors="pt", padding=True, truncation=True ).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # 用 greedy ) for j, o in enumerate(outputs): idx = i + j if idx >= len(dataset): break generated_text = tokenizer.decode(o, skip_special_tokens=True) extracted_answer = extract_xml_answer(generated_text) all_preds.append(extracted_answer) all_refs.append(refs[j]) # 每 10 个样本打印一次 if idx % 10 == 0: print(f"Sample {idx}:") print(f"Generated (assistant response): {generated_text}") print(f"Extracted <answer>: {extracted_answer}") print(f"Reference: {refs[j]}") print("-" * 30) bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics # 16. 手动逐 epoch 训练与测试 num_epochs = int(training_args.num_train_epochs) for epoch in range(num_epochs): print(f"======= Starting Epoch {epoch + 1}/{num_epochs} =======") trainer.train() # 评估 print("\nStarting evaluation...") try: test_metrics = evaluate_model(model, tokenizer, test_dataset) print(f"\n===== Evaluation at Epoch {epoch + 1} =====") print(f"Test BLEU: {test_metrics['bleu']:.4f}") print(f"Test TER: {test_metrics['ter']:.4f}") print(f"Test Length Acc: {test_metrics['length_acc']:.4f}") print("==========================================") except Exception as e: print(f"Error during evaluation: {e}") import traceback traceback.print_exc()

Condividi questa Q&A