from unsloth import FastLanguageModel, PatchFastRL...
Erstellt am: 5. März 2025
Erstellt am: 5. März 2025
from unsloth import FastLanguageModel, PatchFastRL
from unsloth import is_bfloat16_supported
import torch
import re
import random
import langid
from typing import List
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
from nltk import edit_distance
from datasets import load_dataset, Dataset, concatenate_datasets
from trl import GRPOConfig, GRPOTrainer
import wandb
SUPPORTED_LANGUAGES = {
"en_to_zh": ("英语", "中文"),
"zh_to_en": ("中文", "英语"),
"jp_to_zh": ("日语", "中文"),
"zh_to_azh": ("中文", "中文(文言文)"),
"en_to_azh": ("英语", "中文(文言文)"),
"de_to_zh": ("德语", "中文"),
"italy_to_zh": ("意大利语", "中文"),
"rus_to_zh": ("俄语", "中文"),
"fr_to_zh": ("法语", "中文"),
"mix": ("中文", "multilingual")
}
SYSTEM_PROMPT = """
You are a versatile assistant. Follow these rules:
XML_COT_FORMAT = """
<reasoning>
{reasoning}
</reasoning>
<answer>
{answer}
</answer>
"""
def get_lyric_datasets(path: str, train_test_ratio=0.8) -> dict:
"""
Load a lyrics translation dataset from JSON and split into train and test sets (e.g. 80%/20%).
Each data item has a 'type' (e.g. 'en_to_zh'), 'lyric', and 'target_lyric'.
Returns a DatasetDict with 'train' and 'test' splits.
"""
data = Dataset.from_json(path)
textdef map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] # Insert the correct source->target languages into the system prompt: system_plus = SYSTEM_PROMPT.format(lang_src, lang_tgt) # Mark the "task" field as "lyrics", so we know it is a lyric translation request return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'], 'task': "lyrics" } data = data.map(map_fn) return data.train_test_split(train_size=train_test_ratio, seed=42)
def extract_hash_answer(text: str):
"""
Utility for GSM8K data: The official GSM8K solutions often contain '#### <numerical answer>'.
This function extracts the substring after '####' as the final numeric/string answer.
Return None if not found.
"""
if "####" not in text:
return None
return text.split("####")[1].strip()
def get_gsm8k_questions(split="train", sample_size=3000) -> Dataset:
"""
Example: load and unify multiple GSM8K parquet shards into one dataset.
We assume they are located at ./openai/gsm8k/{split}-*.parquet
textThe 'question' is the user prompt, the 'answer' is the official solution which ends in '#### <number>'. This function transforms them to match our (prompt, answer) structure. Returns a Dataset with columns: 'prompt', 'answer', and 'task'='math'. """ data = Dataset.from_parquet(f'../data_pack/openai/gsm8k/{split}-*.parquet') # type: ignore data = data.select(range(sample_size)) def map_fn(x): # We'll use the same universal system prompt (with placeholders that won't matter # for math tasks). We'll just supply the plain SYSTEM_PROMPT so the model knows # to produce the same <reasoning> / <answer> format. # We do NOT need placeholders like format(lang_src, lang_tgt), so we'll just do: user_prompt = x['question'] final_answer = extract_hash_answer(x['answer']) return { 'prompt': [ {'role': 'system', 'content': SYSTEM_PROMPT.format("N/A", "N/A")}, # placeholders {'role': 'user', 'content': user_prompt} ], 'answer': final_answer if final_answer else "", 'task': "math" } data = data.map(map_fn) return data
def extract_xml_answer(text: str) -> str:
"""
Extract the text between <answer> and </answer>.
Returns an empty string if not found.
"""
pattern = r"<answer>\s*(.?)\s</answer>"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def detect_language(text: str) -> str:
"""Use langid to detect language code, e.g. 'en', 'zh', 'fr'."""
return langid.classify(text)[0]
def compute_length_acc(
preds: List[str],
refs: List[str],
tokenizer,
max_tolerance: float = 0.5
) -> List[float]:
"""
返回一组与 refs 同长度的分数列表,每个分数表示对应 pred 与 ref 的长度相似度奖励。
计算生成的歌词长度与参考长度的相似度奖励,参考 compute_length_acc。
# 从completions中抽取真正的文本回答
# 注意completions格式,如 completion[0]['content']
如果length_ratio <= 0.1,奖励+1,
如果length_ratio <= 0.2,奖励+0.8,
如果length_ratio <= 0.3,奖励+0.6,
如果length_ratio <= 0.4,奖励+0.4,
如果length_ratio <= 0.5,奖励+0.2,
其余情况,奖励0,
"""
rewards = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
rewards.append(0.0)
continue
length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens)
textif length_ratio <= 0.1: score = 1.0 elif length_ratio <= 0.2: score = 0.7 elif length_ratio <= 0.3: score = 0.5 elif length_ratio <= 0.4: score = 0.3 else: score = 0.0 rewards.append(score) return rewards
def compute_bleu(
preds: List[str],
refs: List[str],
tokenizer
) -> List[float]:
"""
分别对每个 pred/ref pair 计算 BLEU 分数 (sentence-level)。
这里使用 (0.4, 0.4, 0.1, 0.1) 作为 n-gram 权重。
返回每个样本的 BLEU 分数列表。
计算每个预测与答案的 BLEU 分数后,映射到不同奖励区间。
if bleu >= 0.9 => +5
elif bleu >= 0.8 => +4.5
elif bleu >= 0.7 => +4
elif bleu >= 0.6 => +3.5
elif bleu >= 0.5 => +2.5
elif bleu >= 0.4 => +2
elif bleu >= 0.3 => +1.5
elif bleu >= 0.2 => +1
elif bleu >= 0.1 => +0.5
else => 0
"""
smoothie = SmoothingFunction().method1
weights = (0.25, 0.25, 0.25, 0.25)
scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if not pred_tokens or not ref_tokens:
scores.append(0.0)
continue
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
weights=weights,
smoothing_function=smoothie
)
scores.append(bleu)
return scores
def compute_ter(
preds: List[str],
refs: List[str],
tokenizer
) -> List[float]:
"""
逐样本计算 TER (Translation Edit Rate):
TER = (编辑距离 / 参考的词数) * 100
返回每个样本的 TER 分数。
"""
ter_scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
# 如果ref为空、pred不为空,则TER可视为100;都为空则0
if len(pred_tokens) > 0:
ter_scores.append(100.0)
else:
ter_scores.append(0.0)
continue
dist = edit_distance(pred_tokens, ref_tokens)
ter = (dist / len(ref_tokens)) * 100
ter_scores.append(ter)
return ter_scores
def compute_comet(
srcs: List[str],
preds: List[str],
refs: List[str]
) -> List[float]:
"""
使用 COMET 模型进行翻译质量评估。
示例模型: "Unbabel/wmt22-comet-da"
注:需要安装 pip install comet-llm 或 pip install unbabel-comet
以及下载对应的模型权重。此处仅作演示。
"""
# 下面是伪代码,使用真实 comet 时请根据官方文档初始化 model
# model_path = download_model("Unbabel/wmt22-comet-da")
# model = load_from_checkpoint(model_path)
text# batch_data = [] # for s, p, r in zip(srcs, preds, refs): # batch_data.append({ # "src": s, # "mt": p, # "ref": r # }) # model_output = model.predict(batch_data, batch_size=8, gpus=1) # comet_scores = model_output["scores"] # 假设返回一个列表 # 这里只返回随机值做演示 import random comet_scores = [random.random() for _ in range(len(preds))] return comet_scores
def reward_func_decorator(func):
"""
装饰器:确保每个reward function接收正确参数,传递 tokenizer
"""
def wrapper(prompts, completions, answers, tokenizer=None, **kwargs):
if tokenizer is None:
raise ValueError(f"{func.name} needs a tokenizer.")
return func(prompts, completions, answers, tokenizer=tokenizer, **kwargs)
return wrapper
def length_acc_reward_func(prompts, completions, answer, **kwargs) -> List[float]:
"""
计算生成的歌词长度与参考长度的相似度奖励
"""
responses = [completion[0]['content'] for completion in completions]
q = prompts[0][-1]['content']
extracted_responses = [extract_xml_answer(r) for r in responses]
print('-'*20, f"Original Lyrics:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}")
length_rewards = compute_length_acc(preds=extracted_responses, refs=answer, tokenizer=tokenizer)
return length_rewards
def bleu_reward_func(completions, answer, **kwargs) -> List[float]:
"""
计算 BLEU 分数
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for score in bleu_scores:
if score >= 0.9:
rewards.append(5.0)
elif score >= 0.8:
rewards.append(4.5)
elif score >= 0.7:
rewards.append(4.0)
elif score >= 0.6:
rewards.append(3.5)
elif score >= 0.5:
rewards.append(2.5)
elif score >= 0.4:
rewards.append(2.0)
elif score >= 0.3:
rewards.append(1.5)
elif score >= 0.2:
rewards.append(1.0)
elif score >= 0.1:
rewards.append(0.5)
else:
rewards.append(0.0)
return rewards
def ter_reward_func(completions, answer, **kwargs) -> List[float]:
"""
根据 TER (Translation Edit Rate) 分数映射奖励:
if ter >= 80% => 0
elif ter >= 60% => 0.5
elif ter >= 40% => 1
elif ter >= 20% => 1.5
else => 2
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for t in ter_scores:
if t >= 80:
rewards.append(0.0)
elif t >= 60:
rewards.append(0.5)
elif t >= 40:
rewards.append(1.0)
elif t >= 20:
rewards.append(1.5)
else:
rewards.append(2.0)
return rewards
def language_recognition(completions, answer, **kwargs) -> List[float]:
"""
简单判断completions中的语言是否与answers语言一致,如果一致奖励+1,否则0。
不同语言,如中文、英文、法语等可识别,但对于方言/文言文可能不精确。
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
rewards = []
for pred, ref in zip(extracted, answer):
# 如果提取结果为空字符串,则直接奖励0
if not pred.strip():
rewards.append(0.0)
continue
pred_lang = detect_language(pred)
ref_lang = detect_language(ref)
rewards.append(1.0 if pred_lang == ref_lang else 0.0)
return rewards
def strict_format_reward_func(completions, *kwargs) -> list[float]:
pattern = r"<reasoning>[\s\S]+</reasoning>\s<answer>[\s\S]+</answer>"
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
# Give +1 if fully matches, else 0
if re.search(pattern, r):
scores.append(1.0)
else:
scores.append(0.0)
return scores
def soft_format_reward_func(completions, **kwargs) -> list[float]:
"""Reward function that checks if the completion has a specific format."""
pattern = r"<reasoning>.?</reasoning>\s<answer>.*?</answer>"
responses = [completion[0]["content"] for completion in completions]
matches = [re.match(pattern, r) for r in responses]
return [0.5 if match else 0.0 for match in matches]
def correctness_reward_func(prompts, completions, answer, **kwargs) -> list[float]:
"""
For math tasks: If the extracted final answer matches the known correct solution exactly => +2.0, else 0.0.
"""
responses = [completion[0]['content'] for completion in completions]
q = prompts[0][-1]['content']
extracted_responses = [extract_xml_answer(r) for r in responses]
print('-' * 20, f"Question:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}",
f"\nExtracted:\n{extracted_responses[0]}")
return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)]
def int_reward_func(completions,**kwargs) -> list[float]:
"""
For math tasks: Reward if the final answer is purely digits (like an integer).
"""
responses = [completion[0]['content'] for completion in completions]
extracted_responses = [extract_xml_answer(r) for r in responses]
scores = []
for r in extracted_responses:
# Check if r is purely numeric (with optional minus sign)
if re.match(r"^-?\d+$", r):
scores.append(0.5)
else:
scores.append(0.0)
return scores
def xmlcount_reward_func(completions, **kwargs) -> List[float]:
"""
演示一种自定义计数:根据出现的tag次数给分。
只是示例,与翻译质量无关。
"""
def count_xml(text) -> float:
count = 0.0
# 出现 <reasoning>\n 记 0.125 分
if text.count("<reasoning>\n") == 1:
count += 0.125
if text.count("\n</reasoning>\n") == 1:
count += 0.125
if text.count("\n<answer>\n") == 1:
count += 0.125
# 额外演示:对多余文本作负面扣分
leftover = text.split("\n</answer>\n")[-1]
count -= len(leftover)*0.001
if text.count("\n</answer>") == 1:
count += 0.125
leftover = text.split("\n</answer>")[-1]
count -= (len(leftover) - 1)*0.001
return count
textresponses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses]
def reduce_repetition_reward_func(completions, **kwargs) -> List[float]:
"""
Goal: Address excessive repetition in the generated lyrics, e.g. "ない ない ない ない...".
textApproach: 1. Extract the <answer> from each model output. 2. Split into lines (by '\n'). 3. Calculate ratio of unique lines to total lines. 4. If ratio < 0.8 => reward=0.0, else => reward=1.0 (You can adjust thresholds or do a smoother scale.) """ # Make sure we only do this reward for lyrics tasks: # If the task is "math", do not apply or just return 0.0. # We'll assume prompts[0][0] is system prompt, prompts[0][1] is user prompt. # We can check the "task" we might have stored externally. # For simplicity, let's do a naive approach and guess from the user content. # Alternatively, we can embed the "task" in the dataset directly if we want to filter. # Here, I'll assume we only apply if the first sample has "task" = 'lyrics' somewhere in prompts. # (1) Decide if it's lyrics or math from the "task" - if we injected it in the dataset, we might have: # But TRL doesn't pass that directly. We can unify a method: we rely on the raw text. # If you prefer a more robust approach, see "domain-based reward dispatch" below. # For demonstration, let's apply it unconditionally, but you can decide to skip if 'math' is found: # if "math" in str(prompts[0]): return [0.0]*len(completions) responses = [c[0]["content"] for c in completions] rewards = [] for resp in responses: answer_text = extract_xml_answer(resp) lines = [l.strip() for l in answer_text.split("\n") if l.strip()] if len(lines) <= 1: # Not enough lines to measure repetition rewards.append(0.0) continue unique_lines = len(set(lines)) ratio = unique_lines / len(lines) # Simple threshold reward_val = 1.0 if ratio >= 0.8 else 0.0 rewards.append(reward_val) return rewards
def evaluate_model(trainer, test_dataset, tokenizer):
"""Evaluate the model on test_dataset using BLEU, TER, and length_acc"""
print("\nBegin Evaluation on Test Dataset...")
textmodel = trainer.model # 将模型设置为评估模式 model.eval() # 将模型移动到可用的设备(CPU 或 GPU) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) # 存储预测和参考答案 predictions = [] references = [] for example in test_dataset: prompt = example["prompt"] true_answer = example["answer"] # 将提示转换为模型输入格式 inputs = tokenizer(prompt, return_tensors="pt", truncation=True, padding=True) input_ids = inputs.input_ids.to(device) # 生成模型输出 with torch.no_grad(): generated_ids = model.generate(input_ids, max_length=512) # 解码生成的输出 generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True) predictions.append(generated_text) references.append(true_answer) # 提取生成的答案 pred_answers = [extract_xml_answer(pred) for pred in predictions] # 计算评估指标 bleu_scores = compute_bleu(pred_answers, references, tokenizer=tokenizer) ter_scores = compute_ter(pred_answers, references, tokenizer=tokenizer) length_accs = compute_length_acc(pred_answers, references, tokenizer=tokenizer) # 记录结果 log_results({ "BLEU": bleu_scores, "TER": ter_scores, "LENGTH_ACCURACY": length_accs, }) print("Test evaluation completed.")
def log_results(results: dict, trainer=None):
"""Log results to console and WandB if applicable"""
for metric, scores in results.items():
avg_score = sum(scores) / len(scores) if scores else 0.0
print(f"[Test] {metric}: Avg {avg_score:.4f}")
if trainer and trainer.args.report_to:
trainer.log({f"{metric}/avg": avg_score})
print("-" * 40)
def build_model_and_tokenizer(model_path="../model/Qwen2.5-3B-Instruct", max_seq_length=1536, lora_rank=16):
"""
Load a Qwen (or other) model via FastLanguageModel with LoRA.
Return (model, tokenizer).
"""
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=model_path,
max_seq_length=max_seq_length,
load_in_4bit=True, # set to True if you want 4-bit
fast_inference=True, # set True if you want vLLM fast inference
max_lora_rank=lora_rank,
gpu_memory_utilization=0.7
)
# tokenizer.pad_token = tokenizer.eos_token
# tokenizer.padding_side = 'left'
textmodel = FastLanguageModel.get_peft_model( model, r = lora_rank, target_modules = [ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", ], lora_alpha = lora_rank, use_gradient_checkpointing = "unsloth", random_state = 3407, ) return model, tokenizer
PatchFastRL("GRPO", FastLanguageModel)
model, tokenizer = build_model_and_tokenizer()
gsm8k_dataset = get_gsm8k_questions(split="train")
training_args_math = GRPOConfig(
use_vllm=True, # use vLLM for fast inference!
learning_rate=5e-6,
adam_beta1=0.9,
adam_beta2=0.99,
weight_decay=0.1,
warmup_ratio=0.1,
lr_scheduler_type="cosine",
optim="paged_adamw_8bit",
logging_steps=1,
bf16=is_bfloat16_supported(),
fp16=not is_bfloat16_supported(),
per_device_train_batch_size=16,
gradient_accumulation_steps=2, # Increase to 4 for smoother training
num_generations=16, # Decrease if out of memory
max_prompt_length=768,
max_completion_length=768,
num_train_epochs=1, # Set to 1 for a full training run
report_to = "none",
output_dir = "outputs_math_phase"
)
math_reward_funcs = [
correctness_reward_func,
int_reward_func,
strict_format_reward_func,
xmlcount_reward_func,
soft_format_reward_func
]
trainer_math = GRPOTrainer(
model = model,
processing_class = tokenizer,
reward_funcs = math_reward_funcs,
args = training_args_math,
train_dataset = gsm8k_dataset,
)
print("=== Starting Phase 1: Math Training (GSM8K) ===")
trainer_math.train()
print("=== Finished Phase 1 ===\n")
dataset_dict = get_lyric_datasets("../data_pack/multi_lyric.json", train_test_ratio=0.8)
train_dataset_lyrics = dataset_dict['train']
test_dataset_lyrics = dataset_dict['test']
training_args_lyrics = GRPOConfig(
use_vllm = True, # use vLLM for fast inference!
learning_rate = 5e-6,
adam_beta1 = 0.9,
adam_beta2 = 0.99,
weight_decay = 0.1,
warmup_ratio = 0.1,
lr_scheduler_type = "cosine",
optim = "paged_adamw_8bit",
logging_steps = 1,
bf16 = is_bfloat16_supported(),
fp16 = not is_bfloat16_supported(),
per_device_train_batch_size = 4,
gradient_accumulation_steps = 2, # Increase to 4 for smoother training
num_generations = 8, # Decrease if out of memory
max_prompt_length = 768,
max_completion_length = 768,
num_train_epochs = 2, # Set to 1 for a full training run
# max_steps = 50,
save_steps = 250,
max_grad_norm = 0.1,
report_to = "none", # Can use Weights & Biases
output_dir = "outputs_lyrics_phase",
)
lyrics_reward_funcs = [
xmlcount_reward_func,
soft_format_reward_func,
strict_format_reward_func,
length_acc_reward_func,
bleu_reward_func,
ter_reward_func,
language_recognition,
# reduce_repetition_reward_func
]
trainer_lyrics = GRPOTrainer(
model = model,
processing_class = tokenizer,
reward_funcs = lyrics_reward_funcs,
args = training_args_lyrics,
train_dataset = train_dataset_lyrics,
)
print("=== Starting Phase 2: Lyrics Training ===")
trainer_lyrics.train()
print("=== Finished Phase 2. ===\n")
evaluate_model(trainer_lyrics, test_dataset_lyrics, tokenizer=tokenizer)
当前的脚本一旦运行到Phase 2就会报错,报错,unsloth的确不乏支持多GPU但我自始至终的shell写的是(export CUDA_LAUNCH_BLOCKING=0
export CUDA_VISIBLE_DEVICES=0
CUDA_VISIBLE_DEVICES=0 python unsloth_mix_3b_run.py),
问什么还会报错:
=== Finished Phase 1 ===
Unsloth: We now expect per_device_train_batch_size
to be a multiple of num_generations
.
We will change the batch size of 4 to the num_generations
of 8
=== Starting Phase 2: Lyrics Training ===
[rank0]: Traceback (most recent call last):
[rank0]: File "/home/data/guoweis/zero/tinyzero/qwen3b_grpo/unsloth_mix_3b_run.py", line 707, in <module>
[rank0]: trainer_lyrics.train()
[rank0]: File "/home/data/guoweis/miniconda3/envs/unsloth_env/lib/python3.11/site-packages/transformers/trainer.py", line 2171, in train
[rank0]: return inner_training_loop(
[rank0]: ^^^^^^^^^^^^^^^^^^^^
[rank0]: File "<string>", line 226, in _fast_inner_training_loop
[rank0]: RuntimeError: Unsloth currently does not support multi GPU setups - but we are working on it!
[rank0]:[W305 19:35:44.965480456 ProcessGroupNCCL.cpp:1250] Warning: WARNING: process group has NOT been destroyed before we destruct ProcessGroupNCCL. On normal program exit, the application should call destroy_process_group to ensure that any pending NCCL operations have finished in this process. In rare cases this process can exit before this point and block the progress of another member of the process group. This constraint has always been present, but this warning has only been added since PyTorch 2.4 (function operator())
请帮我修正上述bug,我要重新run提交模型,确保不会训练中断!
原因分析:
Unsloth 目前尚不支持多卡/分布式训练。如果脚本在第二阶段报出
RuntimeError: Unsloth currently does not support multi GPU setups - but we are working on it!
这说明它「检测到」了多卡环境,从而报错终止。即使您只 export CUDA_VISIBLE_DEVICES=0
,有时候仍会触发 PyTorch/Transformers 的分布式逻辑(例如在某些集群/容器环境下,或使用 vLLM
时自动检测到多卡)。
最常见触发点有:
torch.distributed
或 accelerate
的分布式训练(哪怕物理上只有一块卡,也可能因为分布式初始化成功而被认为是多卡)。vLLM
配合旧版 transformers
/accelerate
,可能会错误检测 GPU 数量。MASTER_ADDR
, WORLD_SIZE
, 等)导致分布式训练被激活。主要修正思路:
torch.distributed
/accelerate
初始化;torchrun --nproc_per_node=...
或 accelerate launch ...
;use_vllm=False
,彻底使用普通单卡推理,避免 vLLM 自动检测多卡;os.environ["CUDA_VISIBLE_DEVICES"] = "0"
,并在正式开始训练前打印一下 torch.cuda.device_count()
以验证只剩 1。下面给您一个推荐的最小改动示例,将第二阶段的 use_vllm
改成 False
,并确保批大小和 num_generations
不会再让库去检查“多卡”:
pythonimport os os.environ["CUDA_VISIBLE_DEVICES"] = "0" import torch # 在开始前检查一下当前可见的 GPU 数量,如果不是1,就要去掉其他相关环境/进程 print("Detected GPU count:", torch.cuda.device_count()) # ------------------------------------- # Phase 1: Math Training (不变,如果没报错就无需改) training_args_math = GRPOConfig( use_vllm=True, # 其他超参... ) trainer_math = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=math_reward_funcs, args=training_args_math, train_dataset=gsm8k_dataset, ) trainer_math.train() # ------------------------------------- # Phase 2: Lyrics Training (关键改动) training_args_lyrics = GRPOConfig( # 把 use_vllm 改成 False 以避免多卡冲突 use_vllm=False, # 如果您的 per_device_train_batch_size < num_generations,会被 unsloth 自动改到一致; # 这里二者只要一致或 batch_size >= num_generations 即可,防止自动改完又被判断多卡。 per_device_train_batch_size=8, num_generations=8, # 其余超参... learning_rate=5e-6, # ... output_dir = "outputs_lyrics_phase", ) trainer_lyrics = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=lyrics_reward_funcs, args=training_args_lyrics, train_dataset=train_dataset_lyrics, ) trainer_lyrics.train() # 不再报多GPU错误 evaluate_model(trainer_lyrics, test_dataset_lyrics, tokenizer=tokenizer)
如果您连第一阶段也遇到类似问题,可以在两个阶段都改成 use_vllm=False
。另外,也要检查是否有用 accelerate launch
或其他分布式启动方式,如:
bashtorchrun --nproc_per_node=2 ...
都要改成最普通的单进程执行(比如 python your_script.py
)即可避免触发多卡模式。
强制只认1卡:
pythonimport os os.environ["CUDA_VISIBLE_DEVICES"] = "0" import torch print("Detected GPU count:", torch.cuda.device_count())
如仍然打印出不止1,说明环境/集群某处有额外 GPU 显示或启动了分布式。
确保没有分布式或 accelerate
torchrun
或 accelerate launch
之类的命令启动脚本;以防某些地方自动做了pythonimport torch.distributed as dist if dist.is_initialized(): dist.destroy_process_group()
dist.init_process_group()
。vLLM:
部分 vLLM 的早期版本在检测 GPU 设备数量时有 Bug,会错误识别多 GPU。若确有需要 vLLM 并保持单卡,请升级到较新的 vLLM / transformers,并保证单卡环境变量干净。
照此修改后,Unsloth 在单卡下通常就不会再抛出「multi GPU」的报错了。祝您一切顺利,成功完成训练!