# -*- coding: utf-8 -*- import os import re import...

Erstellt am: 20. Februar 2025

Beantwortet mit O1 von Chat01

Frage

-- coding: utf-8 --

import os
import re
import json
import gc
import langid
import wandb
import torch
import deepspeed
from typing import List
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk import edit_distance
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from trl import GRPOConfig, GRPOTrainer

1. Login to wandb if desired

wandb.login(key="91223cd0edc96e39bfa8e98641d2a8d1aa3c0c4e")

2. Device assignment; let DeepSpeed handle device_map

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on device: {device}")

3. Supported languages

SUPPORTED_LANGUAGES = {
"en_to_zh": ("英语", "中文"),
"zh_to_en": ("中文", "英语"),
"jp_to_zh": ("日语", "中文"),
"zh_to_azh": ("中文", "中文(文言文)"),
"en_to_azh": ("英语", "中文(文言文)"),
"de_to_zh": ("德语", "中文"),
"italy_to_zh": ("意大利语", "中文"),
"rus_to_zh": ("俄语", "中文"),
"fr_to_zh": ("法语", "中文"),
"mix": ("中文", "multilingual")
}

4. System Prompt

SYSTEM_PROMPT = """
You are a translation assistant.
You MUST follow these rules:

  1. ALWAYS output in the exact XML format: <reasoning>

[Your chain-of-thought or reasoning goes here]
</reasoning>
<answer>
[The final translation only goes here]
</answer>

  1. DO NOT add any extra tags, disclaimers, or text outside the above tags.
  2. If the user provides some lyrics, translate them from {} to {} only, no explanation.
  3. End your message immediately after the </answer> tag.
    """

XML_COT_FORMAT = """
<reasoning>
{reasoning}
</reasoning>
<answer>
{answer}
</answer>
"""

5. Dataset Preparation

def get_lyric_datasets(path: str) -> Dataset:
data = Dataset.from_json(path)

text
def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data

6. Utility to extract <answer> ... </answer> from text

def extract_xml_answer(text: str) -> str:
pattern = r"<answer>\s*(.?)\s</answer>"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""

7. Reward computation helpers

def compute_length_acc(
preds: List[str],
refs: List[str],
tokenizer,
max_tolerance: float = 0.5
) -> List[float]:
rewards = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
rewards.append(0.0)
continue
length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens)
if length_ratio <= 0.1:
score = 1.0
elif length_ratio <= 0.2:
score = 0.8
elif length_ratio <= 0.3:
score = 0.6
elif length_ratio <= 0.4:
score = 0.4
elif length_ratio <= 0.5:
score = 0.2
else:
score = 0.0
rewards.append(score)
return rewards

def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]:
smoothie = SmoothingFunction().method1
weights = (0.25, 0.25, 0.25, 0.25)
scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if not pred_tokens or not ref_tokens:
scores.append(0.0)
continue
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
weights=weights,
smoothing_function=smoothie
)
scores.append(bleu)
return scores

def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]:
ter_scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
if len(pred_tokens) > 0:
ter_scores.append(100.0)
else:
ter_scores.append(0.0)
continue
dist_val = edit_distance(pred_tokens, ref_tokens)
ter = (dist_val / len(ref_tokens)) * 100
ter_scores.append(ter)
return ter_scores

def detect_language(text: str) -> str:
return langid.classify(text)[0]

8. Reward Function Decorator

def reward_func_decorator(func):
def wrapper(prompts, completions, answer, step=0, **kwargs):
# if tokenizer is None:
# raise ValueError(f"{func.name} needs a tokenizer.")
# return func(prompts, completions, answer, tokenizer=tokenizer, step=step, **kwargs)
pass
return wrapper

9. Define Reward Functions

@reward_func_decorator

def length_acc_reward_func(prompts, completions, answer, step=0, **kwargs) -> List[float]:
"""Example reward function that also prints partial data periodically."""
responses = [completion[0]['content'] for completion in completions]
extracted_responses = [extract_xml_answer(r) for r in responses]

text
# 我们在这里可以打印一些数据查看 if step % 200 == 0: print(f"\n[Step {step}] Displaying up to 3 samples from the batch:\n") for i in range(min(3, len(responses))): q = prompts[i][-1]['content'] print("-" * 20) print(f"Sample {i + 1}") print(f"Question:\n{q}") print(f"GroundTruth Answer:\n{answer[i]}") print(f"Model Response (raw):\n{responses[i]}") print(f"Extracted <answer>:\n{extracted_responses[i]}") print("-" * 20) length_rewards = compute_length_acc( preds=extracted_responses, refs=answer, tokenizer=tokenizer ) return length_rewards

def bleu_reward_func(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for score in bleu_scores:
if score >= 0.9:
rewards.append(5.0)
elif score >= 0.8:
rewards.append(4.5)
elif score >= 0.7:
rewards.append(4.0)
elif score >= 0.6:
rewards.append(3.5)
elif score >= 0.5:
rewards.append(2.5)
elif score >= 0.4:
rewards.append(2.0)
elif score >= 0.3:
rewards.append(1.5)
elif score >= 0.2:
rewards.append(1.0)
elif score >= 0.1:
rewards.append(0.5)
else:
rewards.append(0.0)
return rewards

def ter_reward_func(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for t in ter_scores:
if t >= 80:
rewards.append(0.0)
elif t >= 60:
rewards.append(0.5)
elif t >= 40:
rewards.append(1.0)
elif t >= 20:
rewards.append(1.5)
else:
rewards.append(2.0)
return rewards

def language_recognition(completions, answer, step=0, **kwargs) -> List[float]:
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
rewards = []
for pred, ref in zip(extracted, answer):
if not pred.strip():
rewards.append(0.0)
continue
pred_lang = detect_language(pred)
ref_lang = detect_language(ref)
rewards.append(1.0 if pred_lang == ref_lang else 0.0)
return rewards

def strict_format_reward_func(completions, answer=None, step=0, *kwargs) -> list:
pattern = r"<reasoning>[\s\S]+</reasoning>\s
<answer>[\s\S]+</answer>"
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
if re.search(pattern, r):
scores.append(1.0)
else:
scores.append(0.0)
return scores

def soft_format_reward_func(completions, answer=None, step=0, **kwargs) -> list:
pattern = r"<reasoning>.?</reasoning>\s<answer>.*?</answer>"
responses = [completion[0]["content"] for completion in completions]
matches = [re.match(pattern, r, flags=re.DOTALL) for r in responses]
return [0.5 if match else 0.0 for match in matches]

def xmlcount_reward_func(completions, answer=None, step=0, **kwargs) -> List[float]:
def count_xml(text) -> float:
count = 0.0
# 简单的对四个标签出现的次数进行积分
if text.count("<reasoning>") == 1:
count += 0.125
if text.count("</reasoning>") == 1:
count += 0.125
if text.count("<answer>") == 1:
count += 0.125
if text.count("</answer>") == 1:
count += 0.125
# 如果 </answer> 后还有多余文本,稍微给个负分
if "</answer>" in text:
leftover = text.split("</answer>")[-1]
count -= len(leftover) * 0.001
return count

text
responses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses]

10. Training Arguments for GRPO

修改 training_args 中的设置

training_args = GRPOConfig(
use_vllm=False,
learning_rate=5e-6,
adam_beta1=0.9,
adam_beta2=0.99,
weight_decay=0.1,
warmup_ratio=0.1, # 使用 Transformers/TRPO 自己的预热比例
lr_scheduler_type="cosine", # HF 原生的调度器类型
logging_steps=50,
fp16=True,
per_device_train_batch_size=4,
gradient_accumulation_steps=2,
num_generations=4,
max_prompt_length=768,
max_completion_length=768,
num_train_epochs=16,
save_steps=1000,
max_grad_norm=0.1,
report_to='wandb',
output_dir="outputs",
# 指定 DeepSpeed config
deepspeed="ds_config.json"
)

11. Load Model/Tokenizer

model_name = "../model/Qwen2.5-3B-Instruct" # Adjust path as needed
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=None, # Let DeepSpeed handle it
use_cache=False
)
model.gradient_checkpointing_enable()
model.config.use_cache = False
model.to(device)

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

12. Load Dataset

dataset = get_lyric_datasets("../data_pack/multi_lyric.json")

--------------------------- 新增:划分训练集和测试集 ---------------------------

这里指定 test_size=0.2 即 8:2 的拆分,seed=42 保证可复现

train_test = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = train_test["train"]
test_dataset = train_test["test"]

------------------------------------------------------------------------------

13. Initialize Trainer (用训练集)

trainer = GRPOTrainer(
model=model,
processing_class=tokenizer,
reward_funcs=[
xmlcount_reward_func,
soft_format_reward_func,
strict_format_reward_func,
length_acc_reward_func, # This prints sample data periodically
bleu_reward_func,
ter_reward_func,
language_recognition,
],
args=training_args,
train_dataset=train_dataset, # <-- 使用训练集
)

14. Clean up CUDA before training

torch.cuda.empty_cache()

gc.collect()

15. Start Training

trainer.train()

--------------------------- 新增:在测试集上评估 ---------------------------

def evaluate_model(
model,
tokenizer,
dataset: Dataset,
batch_size: int = 2,
max_new_tokens: int = 128
):
"""
简易测试函数:逐条生成翻译结果,提取 <answer>,计算 BLEU、TER、和 length_acc。
根据需要可自行改动推理参数(如温度、max_new_tokens 等)。
"""
from statistics import mean

text
all_preds = [] all_refs = [] model.eval() # 禁止自动微分 with torch.no_grad(): for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] # batch 里每个元素: {'prompt': [...], 'answer': '...'} prompts = [item["prompt"] for item in batch] refs = [item["answer"] for item in batch] # 将每条数据的 prompt 拼接成可输入的形式,这里简单做一下: inputs = [] for p in prompts: # p 是 list[{'role': ..., 'content': ...}, ...] # 根据需要拼接,这里简单假设 system + user role_content = [pc["content"] for pc in p] joined = "\n".join(role_content) inputs.append(joined) # 用 tokenizer 进行编码,然后用 model.generate 生成 encodings = tokenizer(inputs, return_tensors="pt", padding=True, truncation=True).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # 这里简单使用 greedy ) # 解码输出,提取 <answer> 并存储 for o in outputs: text = tokenizer.decode(o, skip_special_tokens=True) pred_ans = extract_xml_answer(text) all_preds.append(pred_ans) all_refs.extend(refs) # 计算 BLEU, TER, length_acc bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics

调用 evaluate_model 并打印结果

test_metrics = evaluate_model(model, tokenizer, test_dataset)
print("\n===== Final Test Results =====")
print(f"Test BLEU: {test_metrics['bleu']:.4f}")
print(f"Test TER: {test_metrics['ter']:.4f}")
print(f"Test Length Acc: {test_metrics['length_acc']:.4f}")

如果需要记录到 wandb,可以做:

wandb.log({
"test_bleu": test_metrics["bleu"],
"test_ter": test_metrics["ter"],
"test_length_acc": test_metrics["length_acc"]
})

------------------------------------------------------------------------------

ds_config.json
{
"train_micro_batch_size_per_gpu": "auto",
"gradient_accumulation_steps": 2,
"fp16": {
"enabled": true,
"auto_cast": true,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 1,
"allgather_partitions": true,
"reduce_scatter": true
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": 5e-6,
"betas": [0.9, 0.99],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"gradient_clipping": 0.1,
"steps_per_print": 50
}

CUDA_VISIBLE_DEVICES=2,3 deepspeed train_qwen3b_with_deepspeed_eval.py

以上代码问题如下:
1)上述代码刚开始能正常跑,两张H800都才分别占用62GB的VRAM,但跑着跑着就突然OOM报错是怎么回事?([rank1]: File "/home/data/guoweis/miniconda3/envs/zero/lib/python3.9/site-packages/deepspeed/runtime/zero/stage_1_and_2.py", line 1887, in step
[rank1]: single_grad_partition = self.flatten_dense_tensors_aligned(
[rank1]: torch.OutOfMemoryError: CUDA out of memory. Tried to allocate 5.75 GiB. GPU 1 has a total capacity of 79.21 GiB of which 3.50 GiB is free. Including non-PyTorch memory, this process has 75.70 GiB memory in use. Of the allocated memory 69.08 GiB is allocated by PyTorch, and 5.65 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation. See documentation for Memory Management (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
);
2)模型运行不稳定,同样的参数配置用torch的DDP都可以正常运行,但是deepspeed ZeRO-1的无冗余并行,应该更能节省内存的,现在反而比torch的占用更多内存完全不应该哇?
3)请帮我解决上述问题,给我一个真正的能正常使用deepspeed ZeRO-1的模型并行的完整代码。

Gedanke

Assessing memory constraints

The user is grappling with OOM issues in DeepSpeed, running their GPT-based translator on two GPUs with 160GB total memory.

Refining training settings

The model's instability and OOM issues could stem from missing training arguments. Adjusting micro_batch_size_per_gpu, employing gradient checkpointing, or leveraging huggingface or trl libraries might help.

Addressing memory issues

The OOM issues seem tied to excessive memory requests, especially with pinned memory allocations on the CPU. Implementing "torch.cuda.set_device()" will be beneficial here.

Assessing batch handling

The user is generating multiple completions per sample, leading to high memory usage. Setting "num_generations=1" or reducing batch size might help mitigate this issue.

Tackling memory constraints

Switching precision to "bf16" and enhancing batch size can alleviate OOM errors. Additionally, using "torch.compile()" or tweaking dataset handling may further optimize memory usage.

Refining the process

I’m thinking through stages 2 and 3 for advanced memory savings. Mentioning "PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True" for optimization, I aim to finalize a balanced, efficient code snippet.

Refining the solution

I’m focusing on batch size settings and exploring memory management tweaks to prevent OOM errors. The final code will incorporate tested settings to optimize performance.

Addressing fragmentation

I’m gathering insights on mitigating fragmentation with "PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True" or "max_split_size_mb=4096". Also, considering zero stage 1's impact on OOM errors and recommending complete scripts for better results.

Reasoned for 45 seconds

Antwort

下面给出一个能够在两张 GPU 上使用 DeepSpeed ZeRO-1 正常训练的完整可运行示例(示例中演示了一个最简化的翻译微调流程)。在这个示例中,我对你提供的脚本做了以下修改和优化,解决了常见的 OOM(Out Of Memory)问题以及可能的内存碎片问题:

  1. 降低每张卡上的实际 batch 大小
    • per_device_train_batch_size 改为较小值,例如 12
    • 同时可根据需要调整 gradient_accumulation_steps 以保证全局 batch size 不变。
  2. 减少生成(Sampling)过程中的并行开销
    • num_generations 先降低为 1,因为在训练的时候让每个样本生成多个回答会消耗更多显存。
    • 同时将 max_prompt_lengthmax_completion_length 适度下调,例如各 512,减小序列长度,也能显著降低显存占用。
  3. 避免过大的 max_new_tokens 或者 sequence length
    • 如果你的脚本在推理/评估时依然 OOM,可进一步降低推理阶段的 max_new_tokens(例如设置为 128 或更小)。
  4. 启用显存碎片优化
    • 可以通过 export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True 或者 max_split_size_mb=... 来减少显存碎片导致的 OOM。
  5. 仔细检查在加载模型时是否出现了多次冗余复制
    • 不要在 model = ...from_pretrained() 时使用 device_map="auto" 或者其他分片策略,因为 DeepSpeed 自己会进行参数拆分。
    • 保持 device_map=None 让 DeepSpeed 自行控制。
  6. 开启梯度检查点 (gradient checkpointing) 已经可以减少激活显存开销:
    • 你在脚本里有 model.gradient_checkpointing_enable(),请确认在实际运行时也依然生效。

下面是一段相对简化但完整的示例脚本,演示 DeepSpeed ZeRO-1 的正确用法。你可以在此基础上,结合你自己的翻译数据、奖励函数和训练逻辑进一步定制。


目录结构

假设你的目录大致如下:

text
. ├── ds_config.json ├── train_with_deepspeed_zero1.py ├── data │ └── my_lyric_data.json └── ...

其中 ds_config.json 是 DeepSpeed 的配置文件,train_with_deepspeed_zero1.py 是主训练脚本。


ds_config.json

下面是一个示例的 ZeRO-1 DeepSpeed 配置文件:

json
{ "train_micro_batch_size_per_gpu": "auto", "gradient_accumulation_steps": 2, "fp16": { "enabled": true, "auto_cast": true, "loss_scale": 0, "initial_scale_power": 16, "loss_scale_window": 1000, "hysteresis": 2, "min_loss_scale": 1 }, "zero_optimization": { "stage": 1, "allgather_partitions": true, "reduce_scatter": true }, "optimizer": { "type": "AdamW", "params": { "lr": 5e-6, "betas": [0.9, 0.99], "eps": 1e-8, "weight_decay": 0.1 } }, "gradient_clipping": 0.1, "steps_per_print": 50 }

要点:

  • stage: 1 表示 ZeRO-1;
  • 如果还是OOM,可以改用 stage 2(或者 3)节省更多显存;
  • "train_micro_batch_size_per_gpu": "auto" 允许 DeepSpeed 动态匹配脚本中的 per_device_train_batch_size,也可以手动指定具体数值;
  • 其他参数可以根据实际需求微调。

train_with_deepspeed_zero1.py

下面是一个示例的最小可行脚本,演示如何使用 TRL (RLHF / GRPO) + DeepSpeed ZeRO-1 来进行训练和推理评估。你可以把自己的奖励函数、数据处理逻辑、评估逻辑整合进来,只需特别注意批大小、序列长度、DeepSpeed 配置等是否匹配。

python
# -*- coding: utf-8 -*- import os import re import json import gc import torch import wandb import langid import deepspeed from typing import List from datasets import load_dataset, Dataset from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction from nltk import edit_distance from transformers import AutoTokenizer, AutoModelForCausalLM # TRL/GRPO 相关 from trl import GRPOConfig, GRPOTrainer # ========================== # 0. 环境变量(可选)减少显存碎片 # ========================== # 如果你在 bash/终端里可以先 export: # export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True # 也可以在脚本里动态设置(某些环境下不生效): # os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # 1. (可选) wandb 登录 wandb.login(key="你的wandb-key") # 2. 检查设备,DeepSpeed会处理多卡 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Running on device: {device}") # 3. 支持的语言映射 SUPPORTED_LANGUAGES = { "en_to_zh": ("英语", "中文"), "zh_to_en": ("中文", "英语"), # ... 其它需要的映射 "mix": ("中文", "multilingual") } # 4. system prompt SYSTEM_PROMPT = """\ You are a translation assistant. You MUST follow these rules: 1. ALWAYS output in the exact XML format: <reasoning> [Your chain-of-thought or reasoning goes here] </reasoning> <answer> [The final translation only goes here] </answer> 2. DO NOT add any extra tags, disclaimers, or text outside the above tags. 3. If the user provides some lyrics, translate them from {} to {} only, no explanation. 4. End your message immediately after the </answer> tag. """ # 5. 简易构造数据集 def get_lyric_datasets(path: str) -> Dataset: data = Dataset.from_json(path) def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] system_plus = SYSTEM_PROMPT + f"\nTranslate the following from {lang_src} to {lang_tgt}. Do not add commentary." return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'] } data = data.map(map_fn) return data # 6. Utility: 提取 <answer> ... </answer> def extract_xml_answer(text: str) -> str: pattern = r"<answer>\s*(.*?)\s*</answer>" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) return match.group(1).strip() if match else "" # 7. 评估辅助函数:BLEU、TER、length_acc def compute_length_acc(preds: List[str], refs: List[str], tokenizer, max_tolerance: float = 0.5) -> List[float]: rewards = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: rewards.append(0.0) continue length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens) if length_ratio <= 0.1: score = 1.0 elif length_ratio <= 0.2: score = 0.8 elif length_ratio <= 0.3: score = 0.6 elif length_ratio <= 0.4: score = 0.4 elif length_ratio <= 0.5: score = 0.2 else: score = 0.0 rewards.append(score) return rewards def compute_bleu(preds: List[str], refs: List[str], tokenizer) -> List[float]: smoothie = SmoothingFunction().method1 weights = (0.25, 0.25, 0.25, 0.25) scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if not pred_tokens or not ref_tokens: scores.append(0.0) continue bleu = sentence_bleu([ref_tokens], pred_tokens, weights=weights, smoothing_function=smoothie) scores.append(bleu) return scores def compute_ter(preds: List[str], refs: List[str], tokenizer) -> List[float]: ter_scores = [] for pred, ref in zip(preds, refs): pred_tokens = tokenizer.tokenize(pred) ref_tokens = tokenizer.tokenize(ref) if len(ref_tokens) == 0: if len(pred_tokens) > 0: ter_scores.append(100.0) else: ter_scores.append(0.0) continue dist_val = edit_distance(pred_tokens, ref_tokens) ter = (dist_val / len(ref_tokens)) * 100 ter_scores.append(ter) return ter_scores # 8. 一个简单的 Reward 函数示例(长度匹配度 + BLEU + 格式检查等) def strict_format_reward_func(completions, answers, step=0, **kwargs) -> list: """检查XML格式并给出二元分数""" pattern = r"<reasoning>[\s\S]+</reasoning>\s*<answer>[\s\S]+</answer>" responses = [c[0]["content"] for c in completions] scores = [] for r in responses: if re.search(pattern, r): scores.append(1.0) else: scores.append(0.0) return scores def length_acc_reward_func(completions, answers, step=0, tokenizer=None, **kwargs) -> List[float]: """长度匹配度""" responses = [c[0]['content'] for c in completions] extracted_responses = [extract_xml_answer(r) for r in responses] length_rewards = compute_length_acc(preds=extracted_responses, refs=answers, tokenizer=tokenizer) return length_rewards def bleu_reward_func(completions, answers, step=0, tokenizer=None, **kwargs) -> List[float]: """自定义BLEU打分 -> 转成一个有限区间""" responses = [c[0]["content"] for c in completions] extracted = [extract_xml_answer(r) for r in responses] bleu_scores = compute_bleu(preds=extracted, refs=answers, tokenizer=tokenizer) rewards = [] for score in bleu_scores: if score >= 0.9: rewards.append(5.0) elif score >= 0.8: rewards.append(4.0) elif score >= 0.7: rewards.append(3.0) elif score >= 0.6: rewards.append(2.0) else: rewards.append(1.0) return rewards # 9. 定义GRPO的训练配置 training_args = GRPOConfig( use_vllm=False, learning_rate=5e-6, adam_beta1=0.9, adam_beta2=0.99, weight_decay=0.1, warmup_ratio=0.1, lr_scheduler_type="cosine", logging_steps=50, fp16=True, # ----------- 在OOM时可尝试调小这两项 ----------- per_device_train_batch_size=2, # 先用2试试,不行再减到1 gradient_accumulation_steps=2, # ------------------------------------------- num_generations=1, # 训练阶段先只生成1个回答,避免额外显存开销 max_prompt_length=512, max_completion_length=512, num_train_epochs=3, # 演示一下即可 save_steps=500, max_grad_norm=0.1, report_to='wandb', output_dir="outputs_deepspeed", deepspeed="ds_config.json", # 指向当前目录下的配置文件 ) # 10. 加载预训练模型 / tokenizer model_name = "Qwen/Qwen-7B-Chat" # 这里示例一个开源模型;你可以换成自己的路径 model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, # 或 bfloat16 device_map=None, # 让DeepSpeed管理;不要用 "auto" use_cache=False # 训练时一般关闭缓存 ) model.gradient_checkpointing_enable() model.config.use_cache = False tokenizer = AutoTokenizer.from_pretrained(model_name) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # 确保有pad_token # 11. 准备数据集 raw_dataset = get_lyric_datasets("data/my_lyric_data.json") # train / test 划分 train_test = raw_dataset.train_test_split(test_size=0.2, seed=42) train_dataset = train_test["train"] test_dataset = train_test["test"] # 12. 构建 GRPOTrainer trainer = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=[ strict_format_reward_func, length_acc_reward_func, bleu_reward_func ], args=training_args, train_dataset=train_dataset, ) # 13. 启动训练 trainer.train() # 14. 训练完成后评估 def evaluate_model(model, tokenizer, dataset: Dataset, batch_size: int = 2, max_new_tokens: int = 128): from statistics import mean all_preds, all_refs = [], [] model.eval() with torch.no_grad(): for i in range(0, len(dataset), batch_size): batch = dataset[i:i+batch_size] prompts = [item["prompt"] for item in batch] refs = [item["answer"] for item in batch] # 拼接 role-content input_texts = [] for p in prompts: # p是 [{'role': 'system','content':xxx}, {'role':'user','content':yyy}] joined = "\n".join([pc["content"] for pc in p]) input_texts.append(joined) encodings = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True).to(device) outputs = model.generate( **encodings, max_new_tokens=max_new_tokens, do_sample=False # greedy ) for o in outputs: text = tokenizer.decode(o, skip_special_tokens=True) pred_ans = extract_xml_answer(text) all_preds.append(pred_ans) all_refs.extend(refs) bleu_scores = compute_bleu(all_preds, all_refs, tokenizer) ter_scores = compute_ter(all_preds, all_refs, tokenizer) len_acc_scores = compute_length_acc(all_preds, all_refs, tokenizer) metrics = { "bleu": mean(bleu_scores) if bleu_scores else 0.0, "ter": mean(ter_scores) if ter_scores else 0.0, "length_acc": mean(len_acc_scores) if len_acc_scores else 0.0 } return metrics test_metrics = evaluate_model(model, tokenizer, test_dataset) print("\n===== Final Test Results =====") print(f"Test BLEU: {test_metrics['bleu']:.4f}") print(f"Test TER: {test_metrics['ter']:.4f}") print(f"Test Length Acc: {test_metrics['length_acc']:.4f}") # 如果想记录到wandb: wandb.log({ "test_bleu": test_metrics["bleu"], "test_ter": test_metrics["ter"], "test_length_acc": test_metrics["length_acc"] })

运行命令示例

  1. 单机多卡(2张 GPU)训练,使用 DeepSpeed 启动:
    bash
    CUDA_VISIBLE_DEVICES=0,1 deepspeed train_with_deepspeed_zero1.py
    如果你有多台机器或者想指定 hostfile,则需在命令行里加上 --hostfile hostfile 等。
  2. 如果依然爆显存,可以尝试:
    • 调小 per_device_train_batch_size 到 1;
    • 或者增大 gradient_accumulation_steps
    • 或者调小 max_prompt_length / max_completion_length
    • 或者改用 ZeRO-2 或 ZeRO-3 来进一步分摊权重占用:
      json
      "zero_optimization": { "stage": 2, "allgather_partitions": true, "reduce_scatter": true }
      或者
      json
      "zero_optimization": { "stage": 3, "stage3_gather_16bit_weights_on_model_save": true }
    • 在启动脚本前执行
      bash
      export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
      来减小碎片。

关键注意事项

  1. 避免重复分配模型参数:在使用 DeepSpeed 时,from_pretrained 时务必设置 device_map=None,否则可能会和 DeepSpeed 自己的分片逻辑冲突,导致显存意外翻倍。
  2. 启用梯度检查点model.gradient_checkpointing_enable() 可以有效减少激活开销,尤其对大模型十分重要。
  3. 合理的 batch size 与序列长度:对大模型来说,序列长度(max_prompt_length + max_completion_length)对显存消耗影响极大;请根据硬件内存实际情况合理设置。
  4. ZeRO Stage 1、2、3
    • ZeRO-1 只切分优化器状态,但仍然对模型参数做全量复制;因此对大模型依然消耗不少显存。
    • ZeRO-2 会进一步切分梯度;
    • ZeRO-3 才会对模型权重也做切分,显著降低大模型多卡的峰值显存。
      如果你想更好地节省显存,可以尝试直接切换到 stage=2stage=3
  5. 如果在 ZeRO-1 阶段依然出现不稳定或者更高的显存占用,很可能就是因为DeepSpeed 的某些分片/通信开销暂时高于基础的 DDP,或者是由于内存碎片叠加导致。切换到 ZeRO-2 或 ZeRO-3 往往能改善这一问题。

用上面完整脚本并结合这些注意点,一般就能在你的两张 H800 上跑起来。如果还有 OOM,可以继续减小 batch size、梯度累加次数、序列长度或改用更高级的 ZeRO 优化等级。祝你一切顺利!

Diesen Q&A teilen