# -*- coding: utf-8 -*- import os import re import...
Creato il: 20 febbraio 2025
Creato il: 20 febbraio 2025
import os
import re
import json
import gc
import langid
import wandb
import torch
import deepspeed
from typing import List
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk import edit_distance
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from trl import GRPOConfig, GRPOTrainer
wandb.login(key="91223cd0edc96e39bfa8e98641d2a8d1aa3c0c4e")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on device: {device}")
SUPPORTED_LANGUAGES = {
"en_to_zh": ("英语", "中文"),
"zh_to_en": ("中文", "英语"),
"jp_to_zh": ("日语", "中文"),
"zh_to_azh": ("中文", "中文(文言文)"),
"en_to_azh": ("英语", "中文(文言文)"),
"de_to_zh": ("德语", "中文"),
"italy_to_zh": ("意大利语", "中文"),
"rus_to_zh": ("俄语", "中文"),
"fr_to_zh": ("法语", "中文"),
"mix": ("中文", "multilingual")
}
SYSTEM_PROMPT = """
You are a translation assistant.
You MUST follow these rules:
[Your chain-of-thought or reasoning goes here]
</reasoning>
<answer>
[The final translation only goes here]
</answer>
XML_COT_FORMAT = """
<reasoning>
{reasoning}
</reasoning>
<answer>
{answer}
</answer>
"""
def get_lyric_datasets(path: str) -> Dataset:
data = Dataset.from_json(path)
textdef map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data
def extract_xml_answer(text: str) -> str:
pattern = r"<answer>\s*(.?)\s</answer>"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def compute_length_acc(
preds: List[str],
refs: List[str],
tokenizer,
max_tolerance: float = 0.5
) -> List[float]:
rewards = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
rewards.append(0.0)
continue
length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens)
if length_ratio <= 0.1:
score = 1.0
elif length_ratio <= 0.2:
score = 0.8
elif length_ratio <= 0.3:
score = 0.6
elif length_ratio <= 0.4:
score = 0.4
elif length_ratio <= 0.5:
score = 0.2
else:
score = 0.0
rewards.append(score)
return rewards
def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]:
smoothie = SmoothingFunction().method1
weights = (0.25, 0.25, 0.25, 0.25)
scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if not pred_tokens or not ref_tokens:
scores.append(0.0)
continue
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
weights=weights,
smoothing_function=smoothie
)
scores.append(bleu)
return scores
def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]:
ter_scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
if len(pred_tokens) > 0:
ter_scores.append(100.0)
else:
ter_scores.append(0.0)
continue
dist_val = edit_distance(pred_tokens, ref_tokens)
ter = (dist_val / len(ref_tokens)) * 100
ter_scores.append(ter)
return ter_scores
def detect_language(text: str) -> str:
return langid.classify(text)[0]
def reward_func_decorator(func):
def wrapper(prompts, completions, answer, step=0, **kwargs):
# if tokenizer is None:
# raise ValueError(f"{func.name} needs a tokenizer.")
# return func(prompts, completions, answer, tokenizer=tokenizer, step=step, **kwargs)
pass
return wrapper
def length_acc_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]:
"""Example reward function that also prints partial data periodically."""
responses = [completion[0]['content'] for completion in completions]
extracted_responses = [extract_xml_answer(r) for r in responses]
text# 我们在这里可以打印一些数据查看 if step % 200 == 0: print(f"\n[Step {step}] Displaying up to 3 samples from the batch:\n") for i in range(min(3, len(responses))): q = prompts[i][-1]['content'] print("-" * 20) print(f"Sample {i + 1}") print(f"Question:\n{q}") print(f"GroundTruth Answer:\n{answer[i]}") print(f"Model Response (raw):\n{responses[i]}") print(f"Extracted <answer>:\n{extracted_responses[i]}") print("-" * 20) length_rewards = compute_length_acc( preds=extracted_responses, refs=answer, tokenizer=tokenizer ) return length_rewards
def bleu_reward_func(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for score in bleu_scores:
if score >= 0.9:
rewards.append(5.0)
elif score >= 0.8:
rewards.append(4.5)
elif score >= 0.7:
rewards.append(4.0)
elif score >= 0.6:
rewards.append(3.5)
elif score >= 0.5:
rewards.append(2.5)
elif score >= 0.4:
rewards.append(2.0)
elif score >= 0.3:
rewards.append(1.5)
elif score >= 0.2:
rewards.append(1.0)
elif score >= 0.1:
rewards.append(0.5)
else:
rewards.append(0.0)
return rewards
def ter_reward_func(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for t in ter_scores:
if t >= 80:
rewards.append(0.0)
elif t >= 60:
rewards.append(0.5)
elif t >= 40:
rewards.append(1.0)
elif t >= 20:
rewards.append(1.5)
else:
rewards.append(2.0)
return rewards
def language_recognition(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
rewards = []
for pred, ref in zip(extracted, answer):
if not pred.strip():
rewards.append(0.0)
continue
pred_lang = detect_language(pred)
ref_lang = detect_language(ref)
rewards.append(1.0 if pred_lang == ref_lang else 0.0)
return rewards
def strict_format_reward_func(completions, answer=None, step=0, *kwargs) -> list:
pattern = r"<reasoning>[\s\S]+</reasoning>\s<answer>[\s\S]+</answer>"
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
if re.search(pattern, r):
scores.append(1.0)
else:
scores.append(0.0)
return scores
def soft_format_reward_func(completions, answer=None, step=0, **kwargs) -> list:
pattern = r"<reasoning>.?</reasoning>\s<answer>.*?</answer>"
responses = [completion[0]["content"] for completion in completions]
matches = [re.match(pattern, r, flags=re.DOTALL) for r in responses]
return [0.5 if match else 0.0 for match in matches]
def xmlcount_reward_func(completions, answer=None, step=0, **kwargs) -> List[float]:
def count_xml(text) -> float:
count = 0.0
# 简单的对四个标签出现的次数进行积分
if text.count("<reasoning>") == 1:
count += 0.125
if text.count("</reasoning>") == 1:
count += 0.125
if text.count("<answer>") == 1:
count += 0.125
if text.count("</answer>") == 1:
count += 0.125
# 如果 </answer> 后还有多余文本,稍微给个负分
if "</answer>" in text:
leftover = text.split("</answer>")[-1]
count -= len(leftover) * 0.001
return count
textresponses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses]
training_args = GRPOConfig(
use_vllm=False,
learning_rate=5e-6,
adam_beta1=0.9,
adam_beta2=0.99,
weight_decay=0.1,
warmup_ratio=0.1, # 使用 Transformers/TRPO 自己的预热比例
lr_scheduler_type="cosine", # HF 原生的调度器类型
logging_steps=50,
fp16=True,
per_device_train_batch_size=4,
gradient_accumulation_steps=2,
num_generations=4,
max_prompt_length=768,
max_completion_length=768,
num_train_epochs=16,
save_steps=1000,
max_grad_norm=0.1,
report_to='wandb',
output_dir="outputs",
# 指定 DeepSpeed config
deepspeed="ds_config.json"
)
model_name = "../model/Qwen2.5-3B-Instruct" # Adjust path as needed
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=None, # Let DeepSpeed handle it
use_cache=False
)
model.gradient_checkpointing_enable()
model.config.use_cache = False
model.to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
dataset = get_lyric_datasets("../data_pack/multi_lyric.json")
train_test = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = train_test["train"]
test_dataset = train_test["test"]
trainer = GRPOTrainer(
model=model,
processing_class=tokenizer,
reward_funcs=[
xmlcount_reward_func,
soft_format_reward_func,
strict_format_reward_func,
length_acc_reward_func, # This prints sample data periodically
bleu_reward_func,
ter_reward_func,
language_recognition,
],
args=training_args,
train_dataset=train_dataset, # <-- 使用训练集
)
trainer.train()
def evaluate_model(
model,
tokenizer,
dataset: Dataset,
batch_size: int = 2,
max_new_tokens: int = 128
):
"""
简易测试函数:逐条生成翻译结果,提取 <answer>,计算 BLEU、TER、和 length_acc。
根据需要可自行改动推理参数(如温度、max_new_tokens 等)。
"""
from statistics import mean
textall_preds = [] all_refs = [] model.eval() # 禁止自动微分 with torch.no_grad(): for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] # batch 里每个元素: {'prompt': [...], 'answer': '...'} prompts = [item["prompt"] for item in batch] refs = [item["answer"] for item in batch] # 将每条数据的 prompt 拼接成可输入的形式,这里简单做一下: inputs = [] for p in prompts: # p 是 list[{'role': ..., 'content': ...}, ...] # 根据需要拼接,这里简单假设 system + user role_content = [pc["content"] for pc in p] joined = "\n".join(role_content) inputs.append(joined) # 用 tokenizer 进行编码,然后用 model.generate 生成 encodings = tokenizer(inputs, return_tensors="pt", padding=True, truncation=True).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # 这里简单使用 greedy ) # 解码输出,提取 <answer> 并存储 for o in outputs: text = tokenizer.decode(o, skip_special_tokens=True) pred_ans = extract_xml_answer(text) all_preds.append(pred_ans) all_refs.extend(refs) # 计算 BLEU, TER, length_acc bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics
test_metrics = evaluate_model(model, tokenizer, test_dataset)
print("\n===== Final Test Results =====")
print(f"Test BLEU: {test_metrics['bleu']:.4f}")
print(f"Test TER: {test_metrics['ter']:.4f}")
print(f"Test Length Acc: {test_metrics['length_acc']:.4f}")
wandb.log({
"test_bleu": test_metrics["bleu"],
"test_ter": test_metrics["ter"],
"test_length_acc": test_metrics["length_acc"]
})
ds_config.json
{
"train_micro_batch_size_per_gpu": "auto",
"gradient_accumulation_steps": 2,
"fp16": {
"enabled": true,
"auto_cast": true,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 1,
"allgather_partitions": true,
"reduce_scatter": true
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": 5e-6,
"betas": [0.9, 0.99],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"gradient_clipping": 0.1,
"steps_per_print": 50
}
CUDA_VISIBLE_DEVICES=2,3 deepspeed train_qwen3b_with_deepspeed_eval.py
以上代码问题如下:
1)上述代码刚开始能正常跑,两张H800都才分别占用62GB的VRAM,但跑着跑着就突然OOM报错是怎么回事?([rank1]: File "/home/data/guoweis/miniconda3/envs/zero/lib/python3.9/site-packages/deepspeed/runtime/zero/stage_1_and_2.py", line 1887, in step
[rank1]: single_grad_partition = self.flatten_dense_tensors_aligned(
[rank1]: torch.OutOfMemoryError: CUDA out of memory. Tried to allocate 5.75 GiB. GPU 1 has a total capacity of 79.21 GiB of which 3.50 GiB is free. Including non-PyTorch memory, this process has 75.70 GiB memory in use. Of the allocated memory 69.08 GiB is allocated by PyTorch, and 5.65 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation. See documentation for Memory Management (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
);
2)模型运行不稳定,同样的参数配置用torch的DDP都可以正常运行,但是deepspeed ZeRO-1的无冗余并行,应该更能节省内存的,现在反而比torch的占用更多内存完全不应该哇?
3)请帮我解决上述问题,给我一个真正的能正常使用deepspeed ZeRO-1的模型并行的完整代码。
下面给出一个能够在两张 GPU 上使用 DeepSpeed ZeRO-1 正常训练的完整可运行示例(示例中演示了一个最简化的翻译微调流程)。在这个示例中,我对你提供的脚本做了以下修改和优化,解决了常见的 OOM(Out Of Memory)问题以及可能的内存碎片问题:
per_device_train_batch_size
改为较小值,例如 1
或 2
。gradient_accumulation_steps
以保证全局 batch size 不变。num_generations
先降低为 1
,因为在训练的时候让每个样本生成多个回答会消耗更多显存。max_prompt_length
、max_completion_length
适度下调,例如各 512
,减小序列长度,也能显著降低显存占用。max_new_tokens
或者 sequence length:
max_new_tokens
(例如设置为 128
或更小)。export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
或者 max_split_size_mb=...
来减少显存碎片导致的 OOM。model = ...from_pretrained()
时使用 device_map="auto"
或者其他分片策略,因为 DeepSpeed 自己会进行参数拆分。device_map=None
让 DeepSpeed 自行控制。model.gradient_checkpointing_enable()
,请确认在实际运行时也依然生效。下面是一段相对简化但完整的示例脚本,演示 DeepSpeed ZeRO-1 的正确用法。你可以在此基础上,结合你自己的翻译数据、奖励函数和训练逻辑进一步定制。
假设你的目录大致如下:
text. ├── ds_config.json ├── train_with_deepspeed_zero1.py ├── data │ └── my_lyric_data.json └── ...
其中 ds_config.json
是 DeepSpeed 的配置文件,train_with_deepspeed_zero1.py
是主训练脚本。
下面是一个示例的 ZeRO-1 DeepSpeed 配置文件:
json{ "train_micro_batch_size_per_gpu": "auto", "gradient_accumulation_steps": 2, "fp16": { "enabled": true, "auto_cast": true, "loss_scale": 0, "initial_scale_power": 16, "loss_scale_window": 1000, "hysteresis": 2, "min_loss_scale": 1 }, "zero_optimization": { "stage": 1, "allgather_partitions": true, "reduce_scatter": true }, "optimizer": { "type": "AdamW", "params": { "lr": 5e-6, "betas": [0.9, 0.99], "eps": 1e-8, "weight_decay": 0.1 } }, "gradient_clipping": 0.1, "steps_per_print": 50 }
要点:
stage
: 1 表示 ZeRO-1;"train_micro_batch_size_per_gpu": "auto"
允许 DeepSpeed 动态匹配脚本中的 per_device_train_batch_size
,也可以手动指定具体数值;下面是一个示例的最小可行脚本,演示如何使用 TRL (RLHF / GRPO) + DeepSpeed ZeRO-1 来进行训练和推理评估。你可以把自己的奖励函数、数据处理逻辑、评估逻辑整合进来,只需特别注意批大小、序列长度、DeepSpeed 配置等是否匹配。
python# -*- coding: utf-8 -*- import os import re import json import gc import torch import wandb import langid import deepspeed from typing import List from datasets import load_dataset, Dataset from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction from nltk import edit_distance from transformers import AutoTokenizer, AutoModelForCausalLM # TRL/GRPO 相关 from trl import GRPOConfig, GRPOTrainer # ========================== # 0. 环境变量(可选)减少显存碎片 # ========================== # 如果你在 bash/终端里可以先 export: # export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True # 也可以在脚本里动态设置(某些环境下不生效): # os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # 1. (可选) wandb 登录 wandb.login(key="你的wandb-key") # 2. 检查设备,DeepSpeed会处理多卡 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Running on device: {device}") # 3. 支持的语言映射 SUPPORTED_LANGUAGES = { "en_to_zh": ("英语", "中文"), "zh_to_en": ("中文", "英语"), # ... 其它需要的映射 "mix": ("中文", "multilingual") } # 4. system prompt SYSTEM_PROMPT = """\ You are a translation assistant. You MUST follow these rules: 1. ALWAYS output in the exact XML format: <reasoning> [Your chain-of-thought or reasoning goes here] </reasoning> <answer> [The final translation only goes here] </answer> 2. DO NOT add any extra tags, disclaimers, or text outside the above tags. 3. If the user provides some lyrics, translate them from {} to {} only, no explanation. 4. End your message immediately after the </answer> tag. """ # 5. 简易构造数据集 def get_lyric_datasets(path: str) -> Dataset: data = Dataset.from_json(path) def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data # 6. Utility: 提取 <answer> ... </answer> def extract_xml_answer(text: str) -> str: pattern = r"<answer>\s*(.*?)\s*</answer>" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) return match.group(1).strip() if match else "" # 7. 评估辅助函数:BLEU、TER、length_acc def compute_length_acc(preds: List[str], refs: List[str], tokenizer, max_tolerance: float = 0.5) -> List[float]: rewards = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: rewards.append(0.0) continue length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens) if length_ratio <= 0.1: score = 1.0 elif length_ratio <= 0.2: score = 0.8 elif length_ratio <= 0.3: score = 0.6 elif length_ratio <= 0.4: score = 0.4 elif length_ratio <= 0.5: score = 0.2 else: score = 0.0 rewards.append(score) return rewards def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]: smoothie = SmoothingFunction().method1 weights = (0.25, 0.25, 0.25, 0.25) scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if not pred_tokens or not ref_tokens: scores.append(0.0) continue bleu = sentence_bleu([ref_tokens], pred_tokens, weights=weights, smoothing_function=smoothie) scores.append(bleu) return scores def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]: ter_scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: if len(pred_tokens) > 0: ter_scores.append(100.0) else: ter_scores.append(0.0) continue dist_val = edit_distance(pred_tokens, ref_tokens) ter = (dist_val / len(ref_tokens)) * 100 ter_scores.append(ter) return ter_scores # 8. 一个简单的 Reward 函数示例(长度匹配度 + BLEU + 格式检查等) def strict_format_reward_func(completions, answers, step=0, **kwargs) -> list: """检查XML格式并给出二元分数""" pattern = r"<reasoning>[\s\S]+</reasoning>\s*<answer>[\s\S]+</answer>" responses = [c[0]["content"] for c in completions] scores = [] for r in responses: if re.search(pattern, r): scores.append(1.0) else: scores.append(0.0) return scores def length_acc_reward_func(completions, answers, step=0, tokenizer=None, **kwargs) -> List[float]: """长度匹配度""" responses = [c[0]['content'] for c in completions] extracted_responses = [extract_xml_answer(r) for r in responses] length_rewards = compute_length_acc(preds=extracted_responses, refs=answers, tokenizer=tokenizer) return length_rewards def bleu_reward_func(completions, answers, step=0, tokenizer=None, **kwargs) -> List[float]: """自定义BLEU打分 -> 转成一个有限区间""" responses = [c[0]["content"] for c in completions] extracted = [extract_xml_answer(r) for r in responses] bleu_scores = compute_bleu(preds=extracted, refs=answers, tokenizer=tokenizer) rewards = [] for score in bleu_scores: if score >= 0.9: rewards.append(5.0) elif score >= 0.8: rewards.append(4.0) elif score >= 0.7: rewards.append(3.0) elif score >= 0.6: rewards.append(2.0) else: rewards.append(1.0) return rewards # 9. 定义GRPO的训练配置 training_args = GRPOConfig( use_vllm=False, learning_rate=5e-6, adam_beta1=0.9, adam_beta2=0.99, weight_decay=0.1, warmup_ratio=0.1, lr_scheduler_type="cosine", logging_steps=50, fp16=True, # ----------- 在OOM时可尝试调小这两项 ----------- per_device_train_batch_size=2, # 先用2试试,不行再减到1 gradient_accumulation_steps=2, # ------------------------------------------- num_generations=1, # 训练阶段先只生成1个回答,避免额外显存开销 max_prompt_length=512, max_completion_length=512, num_train_epochs=3, # 演示一下即可 save_steps=500, max_grad_norm=0.1, report_to='wandb', output_dir="outputs_deepspeed", deepspeed="ds_config.json", # 指向当前目录下的配置文件 ) # 10. 加载预训练模型 / tokenizer model_name = "Qwen/Qwen-7B-Chat" # 这里示例一个开源模型;你可以换成自己的路径 model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, # 或 bfloat16 device_map=None, # 让DeepSpeed管理;不要用 "auto" use_cache=False # 训练时一般关闭缓存 ) model.gradient_checkpointing_enable() model.config.use_cache = False tokenizer = AutoTokenizer.from_pretrained(model_name) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # 确保有pad_token # 11. 准备数据集 raw_dataset = get_lyric_datasets("data/my_lyric_data.json") # train / test 划分 train_test = raw_dataset.train_test_split(test_size=0.2, seed=42) train_dataset = train_test["train"] test_dataset = train_test["test"] # 12. 构建 GRPOTrainer trainer = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=[ strict_format_reward_func, length_acc_reward_func, bleu_reward_func ], args=training_args, train_dataset=train_dataset, ) # 13. 启动训练 trainer.train() # 14. 训练完成后评估 def evaluate_model(model, tokenizer, dataset: Dataset, batch_size: int = 2, max_new_tokens: int = 128): from statistics import mean all_preds, all_refs = [], [] model.eval() with torch.no_grad(): for i in range(0, len(dataset), batch_size): batch = dataset[i:i+batch_size] prompts = [item["prompt"] for item in batch] refs = [item["answer"] for item in batch] # 拼接 role-content input_texts = [] for p in prompts: # p是 [{'role': 'system','content':xxx}, {'role':'user','content':yyy}] joined = "\n".join([pc["content"] for pc in p]) input_texts.append(joined) encodings = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # greedy ) for o in outputs: text = tokenizer.decode(o, skip_special_tokens=True) pred_ans = extract_xml_answer(text) all_preds.append(pred_ans) all_refs.extend(refs) bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics test_metrics = evaluate_model(model, tokenizer, test_dataset) print("\n===== Final Test Results =====") print(f"Test BLEU: {test_metrics['bleu']:.4f}") print(f"Test TER: {test_metrics['ter']:.4f}") print(f"Test Length Acc: {test_metrics['length_acc']:.4f}") # 如果想记录到wandb: wandb.log({ "test_bleu": test_metrics["bleu"], "test_ter": test_metrics["ter"], "test_length_acc": test_metrics["length_acc"] })
如果你有多台机器或者想指定 hostfile,则需在命令行里加上bashCUDA_VISIBLE_DEVICES=0,1 deepspeed train_with_deepspeed_zero1.py
--hostfile hostfile
等。per_device_train_batch_size
到 1;gradient_accumulation_steps
;max_prompt_length
/ max_completion_length
;或者json"zero_optimization": { "stage": 2, "allgather_partitions": true, "reduce_scatter": true }
json"zero_optimization": { "stage": 3, "stage3_gather_16bit_weights_on_model_save": true }
来减小碎片。bashexport PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
from_pretrained
时务必设置 device_map=None
,否则可能会和 DeepSpeed 自己的分片逻辑冲突,导致显存意外翻倍。model.gradient_checkpointing_enable()
可以有效减少激活开销,尤其对大模型十分重要。max_prompt_length + max_completion_length
)对显存消耗影响极大;请根据硬件内存实际情况合理设置。stage=2
或 stage=3
。用上面完整脚本并结合这些注意点,一般就能在你的两张 H800 上跑起来。如果还有 OOM,可以继续减小 batch size、梯度累加次数、序列长度或改用更高级的 ZeRO 优化等级。祝你一切顺利!