from unsloth import FastLanguageModel, PatchFastRL...

Erstellt am: 5. März 2025

Beantwortet mit O1 von Chat01

Frage

from unsloth import FastLanguageModel, PatchFastRL
from unsloth import is_bfloat16_supported
import torch
import re
import random
import langid
from typing import List
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
from nltk import edit_distance
from datasets import load_dataset, Dataset, concatenate_datasets
from trl import GRPOConfig, GRPOTrainer
import wandb

wandb.login(key="91223cd0edc96e39bfa8e98641d2a8d1aa3c0c4e")

SUPPORTED_LANGUAGES = {
"en_to_zh": ("英语", "中文"),
"zh_to_en": ("中文", "英语"),
"jp_to_zh": ("日语", "中文"),
"zh_to_azh": ("中文", "中文(文言文)"),
"en_to_azh": ("英语", "中文(文言文)"),
"de_to_zh": ("德语", "中文"),
"italy_to_zh": ("意大利语", "中文"),
"rus_to_zh": ("俄语", "中文"),
"fr_to_zh": ("法语", "中文"),
"mix": ("中文", "multilingual")
}

Load and prep dataset

SYSTEM_PROMPT = """
You are a versatile assistant. Follow these rules:

  1. Always respond in this exact XML format:
    <reasoning>
    [Your reasoning here]
    </reasoning>
    <answer>
    [Final answer or translation here]
    </answer>
  2. If given lyrics, translate them from {} to {} only, without extra explanations.
  3. If given a math question, solve it and provide the result in the same XML format.
    """

XML_COT_FORMAT = """
<reasoning>
{reasoning}
</reasoning>
<answer>
{answer}
</answer>
"""

def get_lyric_datasets(path: str, train_test_ratio=0.8) -> dict:
"""
Load a lyrics translation dataset from JSON and split into train and test sets (e.g. 80%/20%).
Each data item has a 'type' (e.g. 'en_to_zh'), 'lyric', and 'target_lyric'.
Returns a DatasetDict with 'train' and 'test' splits.
"""
data = Dataset.from_json(path)

text
def map_fn(x): lang_src = SUPPORTED_LANGUAGES[x['type']][0] lang_tgt = SUPPORTED_LANGUAGES[x['type']][1] # Insert the correct source->target languages into the system prompt: system_plus = SYSTEM_PROMPT.format(lang_src, lang_tgt) # Mark the "task" field as "lyrics", so we know it is a lyric translation request return { 'prompt': [ {'role': 'system', 'content': system_plus}, {'role': 'user', 'content': x['lyric']} ], 'answer': x['target_lyric'], 'task': "lyrics" } data = data.map(map_fn) return data.train_test_split(train_size=train_test_ratio, seed=42)

def extract_hash_answer(text: str):
"""
Utility for GSM8K data: The official GSM8K solutions often contain '#### <numerical answer>'.
This function extracts the substring after '####' as the final numeric/string answer.
Return None if not found.
"""
if "####" not in text:
return None
return text.split("####")[1].strip()

def get_gsm8k_questions(split="train", sample_size=3000) -> Dataset:
"""
Example: load and unify multiple GSM8K parquet shards into one dataset.
We assume they are located at ./openai/gsm8k/{split}-*.parquet

text
The 'question' is the user prompt, the 'answer' is the official solution which ends in '#### <number>'. This function transforms them to match our (prompt, answer) structure. Returns a Dataset with columns: 'prompt', 'answer', and 'task'='math'. """ data = Dataset.from_parquet(f'../data_pack/openai/gsm8k/{split}-*.parquet') # type: ignore data = data.select(range(sample_size)) def map_fn(x): # We'll use the same universal system prompt (with placeholders that won't matter # for math tasks). We'll just supply the plain SYSTEM_PROMPT so the model knows # to produce the same <reasoning> / <answer> format. # We do NOT need placeholders like format(lang_src, lang_tgt), so we'll just do: user_prompt = x['question'] final_answer = extract_hash_answer(x['answer']) return { 'prompt': [ {'role': 'system', 'content': SYSTEM_PROMPT.format("N/A", "N/A")}, # placeholders {'role': 'user', 'content': user_prompt} ], 'answer': final_answer if final_answer else "", 'task': "math" } data = data.map(map_fn) return data

def extract_xml_answer(text: str) -> str:
"""
Extract the text between <answer> and </answer>.
Returns an empty string if not found.
"""
pattern = r"<answer>\s*(.?)\s</answer>"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""

def detect_language(text: str) -> str:
"""Use langid to detect language code, e.g. 'en', 'zh', 'fr'."""
return langid.classify(text)[0]

def compute_length_acc(
preds: List[str],
refs: List[str],
tokenizer,
max_tolerance: float = 0.5
) -> List[float]:
"""
返回一组与 refs 同长度的分数列表,每个分数表示对应 pred 与 ref 的长度相似度奖励。
计算生成的歌词长度与参考长度的相似度奖励,参考 compute_length_acc。
# 从completions中抽取真正的文本回答
# 注意completions格式,如 completion[0]['content']
如果length_ratio <= 0.1,奖励+1,
如果length_ratio <= 0.2,奖励+0.8,
如果length_ratio <= 0.3,奖励+0.6,
如果length_ratio <= 0.4,奖励+0.4,
如果length_ratio <= 0.5,奖励+0.2,
其余情况,奖励0,
"""
rewards = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
rewards.append(0.0)
continue
length_ratio = abs(len(pred_tokens) - len(ref_tokens)) / len(ref_tokens)

text
if length_ratio <= 0.1: score = 1.0 elif length_ratio <= 0.2: score = 0.7 elif length_ratio <= 0.3: score = 0.5 elif length_ratio <= 0.4: score = 0.3 else: score = 0.0 rewards.append(score) return rewards

def compute_bleu(
preds: List[str],
refs: List[str],
tokenizer
) -> List[float]:
"""
分别对每个 pred/ref pair 计算 BLEU 分数 (sentence-level)。
这里使用 (0.4, 0.4, 0.1, 0.1) 作为 n-gram 权重。
返回每个样本的 BLEU 分数列表。
计算每个预测与答案的 BLEU 分数后,映射到不同奖励区间。
if bleu >= 0.9 => +5
elif bleu >= 0.8 => +4.5
elif bleu >= 0.7 => +4
elif bleu >= 0.6 => +3.5
elif bleu >= 0.5 => +2.5
elif bleu >= 0.4 => +2
elif bleu >= 0.3 => +1.5
elif bleu >= 0.2 => +1
elif bleu >= 0.1 => +0.5
else => 0
"""
smoothie = SmoothingFunction().method1
weights = (0.25, 0.25, 0.25, 0.25)
scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if not pred_tokens or not ref_tokens:
scores.append(0.0)
continue
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
weights=weights,
smoothing_function=smoothie
)
scores.append(bleu)
return scores

def compute_ter(
preds: List[str],
refs: List[str],
tokenizer
) -> List[float]:
"""
逐样本计算 TER (Translation Edit Rate):
TER = (编辑距离 / 参考的词数) * 100
返回每个样本的 TER 分数。
"""
ter_scores = []
for pred, ref in zip(preds, refs):
pred_tokens = tokenizer.tokenize(pred)
ref_tokens = tokenizer.tokenize(ref)
if len(ref_tokens) == 0:
# 如果ref为空、pred不为空,则TER可视为100;都为空则0
if len(pred_tokens) > 0:
ter_scores.append(100.0)
else:
ter_scores.append(0.0)
continue
dist = edit_distance(pred_tokens, ref_tokens)
ter = (dist / len(ref_tokens)) * 100
ter_scores.append(ter)
return ter_scores

def compute_comet(
srcs: List[str],
preds: List[str],
refs: List[str]
) -> List[float]:
"""
使用 COMET 模型进行翻译质量评估。
示例模型: "Unbabel/wmt22-comet-da"
注:需要安装 pip install comet-llm 或 pip install unbabel-comet
以及下载对应的模型权重。此处仅作演示。
"""
# 下面是伪代码,使用真实 comet 时请根据官方文档初始化 model
# model_path = download_model("Unbabel/wmt22-comet-da")
# model = load_from_checkpoint(model_path)

text
# batch_data = [] # for s, p, r in zip(srcs, preds, refs): # batch_data.append({ # "src": s, # "mt": p, # "ref": r # }) # model_output = model.predict(batch_data, batch_size=8, gpus=1) # comet_scores = model_output["scores"] # 假设返回一个列表 # 这里只返回随机值做演示 import random comet_scores = [random.random() for _ in range(len(preds))] return comet_scores

Reward functions

def reward_func_decorator(func):
"""
装饰器:确保每个reward function接收正确参数,传递 tokenizer
"""
def wrapper(prompts, completions, answers, tokenizer=None, **kwargs):
if tokenizer is None:
raise ValueError(f"{func.name} needs a tokenizer.")
return func(prompts, completions, answers, tokenizer=tokenizer, **kwargs)
return wrapper

def length_acc_reward_func(prompts, completions, answer, **kwargs) -> List[float]:
"""
计算生成的歌词长度与参考长度的相似度奖励
"""
responses = [completion[0]['content'] for completion in completions]
q = prompts[0][-1]['content']
extracted_responses = [extract_xml_answer(r) for r in responses]
print('-'*20, f"Original Lyrics:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}")
length_rewards = compute_length_acc(preds=extracted_responses, refs=answer, tokenizer=tokenizer)
return length_rewards

def bleu_reward_func(completions, answer, **kwargs) -> List[float]:
"""
计算 BLEU 分数
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
bleu_scores = compute_bleu(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for score in bleu_scores:
if score >= 0.9:
rewards.append(5.0)
elif score >= 0.8:
rewards.append(4.5)
elif score >= 0.7:
rewards.append(4.0)
elif score >= 0.6:
rewards.append(3.5)
elif score >= 0.5:
rewards.append(2.5)
elif score >= 0.4:
rewards.append(2.0)
elif score >= 0.3:
rewards.append(1.5)
elif score >= 0.2:
rewards.append(1.0)
elif score >= 0.1:
rewards.append(0.5)
else:
rewards.append(0.0)
return rewards

def ter_reward_func(completions, answer, **kwargs) -> List[float]:
"""
根据 TER (Translation Edit Rate) 分数映射奖励:
if ter >= 80% => 0
elif ter >= 60% => 0.5
elif ter >= 40% => 1
elif ter >= 20% => 1.5
else => 2
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
ter_scores = compute_ter(preds=extracted, refs=answer, tokenizer=tokenizer)
rewards = []
for t in ter_scores:
if t >= 80:
rewards.append(0.0)
elif t >= 60:
rewards.append(0.5)
elif t >= 40:
rewards.append(1.0)
elif t >= 20:
rewards.append(1.5)
else:
rewards.append(2.0)
return rewards

def language_recognition(completions, answer, **kwargs) -> List[float]:
"""
简单判断completions中的语言是否与answers语言一致,如果一致奖励+1,否则0。
不同语言,如中文、英文、法语等可识别,但对于方言/文言文可能不精确。
"""
responses = [c[0]["content"] for c in completions]
extracted = [extract_xml_answer(r) for r in responses]
rewards = []
for pred, ref in zip(extracted, answer):
# 如果提取结果为空字符串,则直接奖励0
if not pred.strip():
rewards.append(0.0)
continue
pred_lang = detect_language(pred)
ref_lang = detect_language(ref)
rewards.append(1.0 if pred_lang == ref_lang else 0.0)
return rewards

If you want the model just to see a simpler reward for being in "some" XML format:

def strict_format_reward_func(completions, *kwargs) -> list[float]:
pattern = r"<reasoning>[\s\S]+</reasoning>\s
<answer>[\s\S]+</answer>"
responses = [completion[0]["content"] for completion in completions]
scores = []
for r in responses:
# Give +1 if fully matches, else 0
if re.search(pattern, r):
scores.append(1.0)
else:
scores.append(0.0)
return scores

def soft_format_reward_func(completions, **kwargs) -> list[float]:
"""Reward function that checks if the completion has a specific format."""
pattern = r"<reasoning>.?</reasoning>\s<answer>.*?</answer>"
responses = [completion[0]["content"] for completion in completions]
matches = [re.match(pattern, r) for r in responses]
return [0.5 if match else 0.0 for match in matches]

def correctness_reward_func(prompts, completions, answer, **kwargs) -> list[float]:
"""
For math tasks: If the extracted final answer matches the known correct solution exactly => +2.0, else 0.0.
"""
responses = [completion[0]['content'] for completion in completions]
q = prompts[0][-1]['content']
extracted_responses = [extract_xml_answer(r) for r in responses]
print('-' * 20, f"Question:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}",
f"\nExtracted:\n{extracted_responses[0]}")
return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)]

def int_reward_func(completions,**kwargs) -> list[float]:
"""
For math tasks: Reward if the final answer is purely digits (like an integer).
"""
responses = [completion[0]['content'] for completion in completions]
extracted_responses = [extract_xml_answer(r) for r in responses]
scores = []
for r in extracted_responses:
# Check if r is purely numeric (with optional minus sign)
if re.match(r"^-?\d+$", r):
scores.append(0.5)
else:
scores.append(0.0)
return scores

def xmlcount_reward_func(completions, **kwargs) -> List[float]:
"""
演示一种自定义计数:根据出现的tag次数给分。
只是示例,与翻译质量无关。
"""
def count_xml(text) -> float:
count = 0.0
# 出现 <reasoning>\n 记 0.125 分
if text.count("<reasoning>\n") == 1:
count += 0.125
if text.count("\n</reasoning>\n") == 1:
count += 0.125
if text.count("\n<answer>\n") == 1:
count += 0.125
# 额外演示:对多余文本作负面扣分
leftover = text.split("\n</answer>\n")[-1]
count -= len(leftover)*0.001
if text.count("\n</answer>") == 1:
count += 0.125
leftover = text.split("\n</answer>")[-1]
count -= (len(leftover) - 1)*0.001
return count

text
responses = [c[0]["content"] for c in completions] return [count_xml(c) for c in responses]

def reduce_repetition_reward_func(completions, **kwargs) -> List[float]:
"""
Goal: Address excessive repetition in the generated lyrics, e.g. "ない ない ない ない...".

text
Approach: 1. Extract the <answer> from each model output. 2. Split into lines (by '\n'). 3. Calculate ratio of unique lines to total lines. 4. If ratio < 0.8 => reward=0.0, else => reward=1.0 (You can adjust thresholds or do a smoother scale.) """ # Make sure we only do this reward for lyrics tasks: # If the task is "math", do not apply or just return 0.0. # We'll assume prompts[0][0] is system prompt, prompts[0][1] is user prompt. # We can check the "task" we might have stored externally. # For simplicity, let's do a naive approach and guess from the user content. # Alternatively, we can embed the "task" in the dataset directly if we want to filter. # Here, I'll assume we only apply if the first sample has "task" = 'lyrics' somewhere in prompts. # (1) Decide if it's lyrics or math from the "task" - if we injected it in the dataset, we might have: # But TRL doesn't pass that directly. We can unify a method: we rely on the raw text. # If you prefer a more robust approach, see "domain-based reward dispatch" below. # For demonstration, let's apply it unconditionally, but you can decide to skip if 'math' is found: # if "math" in str(prompts[0]): return [0.0]*len(completions) responses = [c[0]["content"] for c in completions] rewards = [] for resp in responses: answer_text = extract_xml_answer(resp) lines = [l.strip() for l in answer_text.split("\n") if l.strip()] if len(lines) <= 1: # Not enough lines to measure repetition rewards.append(0.0) continue unique_lines = len(set(lines)) ratio = unique_lines / len(lines) # Simple threshold reward_val = 1.0 if ratio >= 0.8 else 0.0 rewards.append(reward_val) return rewards

def evaluate_model(trainer, test_dataset, tokenizer):
"""Evaluate the model on test_dataset using BLEU, TER, and length_acc"""
print("\nBegin Evaluation on Test Dataset...")

text
model = trainer.model # 将模型设置为评估模式 model.eval() # 将模型移动到可用的设备(CPU 或 GPU) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) # 存储预测和参考答案 predictions = [] references = [] for example in test_dataset: prompt = example["prompt"] true_answer = example["answer"] # 将提示转换为模型输入格式 inputs = tokenizer(prompt, return_tensors="pt", truncation=True, padding=True) input_ids = inputs.input_ids.to(device) # 生成模型输出 with torch.no_grad(): generated_ids = model.generate(input_ids, max_length=512) # 解码生成的输出 generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True) predictions.append(generated_text) references.append(true_answer) # 提取生成的答案 pred_answers = [extract_xml_answer(pred) for pred in predictions] # 计算评估指标 bleu_scores = compute_bleu(pred_answers, references, tokenizer=tokenizer) ter_scores = compute_ter(pred_answers, references, tokenizer=tokenizer) length_accs = compute_length_acc(pred_answers, references, tokenizer=tokenizer) # 记录结果 log_results({ "BLEU": bleu_scores, "TER": ter_scores, "LENGTH_ACCURACY": length_accs, }) print("Test evaluation completed.")

def log_results(results: dict, trainer=None):
"""Log results to console and WandB if applicable"""
for metric, scores in results.items():
avg_score = sum(scores) / len(scores) if scores else 0.0
print(f"[Test] {metric}: Avg {avg_score:.4f}")
if trainer and trainer.args.report_to:
trainer.log({f"{metric}/avg": avg_score})
print("-" * 40)

def build_model_and_tokenizer(model_path="../model/Qwen2.5-3B-Instruct", max_seq_length=1536, lora_rank=16):
"""
Load a Qwen (or other) model via FastLanguageModel with LoRA.
Return (model, tokenizer).
"""
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=model_path,
max_seq_length=max_seq_length,
load_in_4bit=True, # set to True if you want 4-bit
fast_inference=True, # set True if you want vLLM fast inference
max_lora_rank=lora_rank,
gpu_memory_utilization=0.7
)
# tokenizer.pad_token = tokenizer.eos_token
# tokenizer.padding_side = 'left'

text
model = FastLanguageModel.get_peft_model( model, r = lora_rank, target_modules = [ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", ], lora_alpha = lora_rank, use_gradient_checkpointing = "unsloth", random_state = 3407, ) return model, tokenizer

PatchFastRL("GRPO", FastLanguageModel)

1. Build model

model, tokenizer = build_model_and_tokenizer()

2. Load GSM8K (train set). We won't do a test split for math for brevity

gsm8k_dataset = get_gsm8k_questions(split="train")

3. Set up training hyperparams for math

training_args_math = GRPOConfig(
use_vllm=True, # use vLLM for fast inference!
learning_rate=5e-6,
adam_beta1=0.9,
adam_beta2=0.99,
weight_decay=0.1,
warmup_ratio=0.1,
lr_scheduler_type="cosine",
optim="paged_adamw_8bit",
logging_steps=1,
bf16=is_bfloat16_supported(),
fp16=not is_bfloat16_supported(),
per_device_train_batch_size=16,
gradient_accumulation_steps=2, # Increase to 4 for smoother training
num_generations=16, # Decrease if out of memory
max_prompt_length=768,
max_completion_length=768,
num_train_epochs=1, # Set to 1 for a full training run
report_to = "none",
output_dir = "outputs_math_phase"
)

4. Define a math-specific reward function list

(We can include strict_format_reward_func if we want.)

math_reward_funcs = [
correctness_reward_func,
int_reward_func,
strict_format_reward_func,
xmlcount_reward_func,
soft_format_reward_func
]

5. Create the trainer for math phase

trainer_math = GRPOTrainer(
model = model,
processing_class = tokenizer,
reward_funcs = math_reward_funcs,
args = training_args_math,
train_dataset = gsm8k_dataset,
)

6. Train on math

print("=== Starting Phase 1: Math Training (GSM8K) ===")
trainer_math.train()
print("=== Finished Phase 1 ===\n")

7. Next, load the lyrics dataset & do Phase 2

dataset_dict = get_lyric_datasets("../data_pack/multi_lyric.json", train_test_ratio=0.8)
train_dataset_lyrics = dataset_dict['train']
test_dataset_lyrics = dataset_dict['test']

8. Training config for lyrics

training_args_lyrics = GRPOConfig(
use_vllm = True, # use vLLM for fast inference!
learning_rate = 5e-6,
adam_beta1 = 0.9,
adam_beta2 = 0.99,
weight_decay = 0.1,
warmup_ratio = 0.1,
lr_scheduler_type = "cosine",
optim = "paged_adamw_8bit",
logging_steps = 1,
bf16 = is_bfloat16_supported(),
fp16 = not is_bfloat16_supported(),
per_device_train_batch_size = 4,
gradient_accumulation_steps = 2, # Increase to 4 for smoother training
num_generations = 8, # Decrease if out of memory
max_prompt_length = 768,
max_completion_length = 768,
num_train_epochs = 2, # Set to 1 for a full training run
# max_steps = 50,
save_steps = 250,
max_grad_norm = 0.1,
report_to = "none", # Can use Weights & Biases
output_dir = "outputs_lyrics_phase",
)

9. Define the lyrics-specific reward functions we want

lyrics_reward_funcs = [
xmlcount_reward_func,
soft_format_reward_func,
strict_format_reward_func,
length_acc_reward_func,
bleu_reward_func,
ter_reward_func,
language_recognition,
# reduce_repetition_reward_func
]

10. New trainer for Phase 2

trainer_lyrics = GRPOTrainer(
model = model,
processing_class = tokenizer,
reward_funcs = lyrics_reward_funcs,
args = training_args_lyrics,
train_dataset = train_dataset_lyrics,
)

print("=== Starting Phase 2: Lyrics Training ===")
trainer_lyrics.train()
print("=== Finished Phase 2. ===\n")

Evaluate on test set if needed

evaluate_model(trainer_lyrics, test_dataset_lyrics, tokenizer=tokenizer)

当前的脚本一旦运行到Phase 2就会报错,报错,unsloth的确不乏支持多GPU但我自始至终的shell写的是(export CUDA_LAUNCH_BLOCKING=0
export CUDA_VISIBLE_DEVICES=0

python unsloth_lyrics_qwen3b_eval.py

CUDA_VISIBLE_DEVICES=0 python unsloth_mix_3b_run.py),
问什么还会报错:
=== Finished Phase 1 ===

Unsloth: We now expect per_device_train_batch_size to be a multiple of num_generations.
We will change the batch size of 4 to the num_generations of 8
=== Starting Phase 2: Lyrics Training ===
[rank0]: Traceback (most recent call last):
[rank0]: File "/home/data/guoweis/zero/tinyzero/qwen3b_grpo/unsloth_mix_3b_run.py", line 707, in <module>
[rank0]: trainer_lyrics.train()
[rank0]: File "/home/data/guoweis/miniconda3/envs/unsloth_env/lib/python3.11/site-packages/transformers/trainer.py", line 2171, in train
[rank0]: return inner_training_loop(
[rank0]: ^^^^^^^^^^^^^^^^^^^^
[rank0]: File "<string>", line 226, in _fast_inner_training_loop
[rank0]: RuntimeError: Unsloth currently does not support multi GPU setups - but we are working on it!
[rank0]:[W305 19:35:44.965480456 ProcessGroupNCCL.cpp:1250] Warning: WARNING: process group has NOT been destroyed before we destruct ProcessGroupNCCL. On normal program exit, the application should call destroy_process_group to ensure that any pending NCCL operations have finished in this process. In rare cases this process can exit before this point and block the progress of another member of the process group. This constraint has always been present, but this warning has only been added since PyTorch 2.4 (function operator())
请帮我修正上述bug,我要重新run提交模型,确保不会训练中断!

Gedanke

Investigating GPU issues

Hmm, I’m digging into why the Unsloth library is detecting multiple GPUs despite a single GPU setup, causing a runtime error. It seems the batch size adjustment might be influencing this recognition.

Investigating batch size modifications

I'm piecing together why the batch size is forcefully changed to 8, potentially causing confusion with gradient accumulation. The Unsloth library's single GPU detection might be affected by training arguments like "use_vllm=True".

Investigating GPU visibility

I’m exploring Unsloth's multi-GPU setup error, which could stem from environment variables or distributed training configurations, despite the user's single GPU setup.

Tweaking configurations

Considering options like setting ddp=False or managing accelerate launch settings to ensure the code operates without multi-GPU interference.

Ensuring single GPU visibility

I’m piecing together how setting export OMP_NUM_THREADS=1 or args.device='cuda:0' can ensure the code focuses on a single GPU, bypassing multi-GPU setups.

Tweaking configurations

I’m considering settings like trainer_lyrics.args.run_on_single_device = True or adjusting num_generations to 4 for consistent batch sizing, aiming to ensure Unsloth does not detect multiple GPUs.

Updating configurations

I’m thinking through changes like removing use_vllm=True to avoid automatic multi-GPU detection or setting trainer_lyrics.args.report_to = 'none' to disable reporting, ensuring consistent single GPU usage.

Ensuring consistent GPU usage

Setting use_vllm=False for the second phase avoids multi-GPU detection, which might cause bugs with Unsloth, ensuring the code remains compatible with a single GPU setup.

Reasoned about multi-GPU support issue for 29 seconds

Antwort

原因分析:
Unsloth 目前尚不支持多卡/分布式训练。如果脚本在第二阶段报出

RuntimeError: Unsloth currently does not support multi GPU setups - but we are working on it!

这说明它「检测到」了多卡环境,从而报错终止。即使您只 export CUDA_VISIBLE_DEVICES=0,有时候仍会触发 PyTorch/Transformers 的分布式逻辑(例如在某些集群/容器环境下,或使用 vLLM 时自动检测到多卡)。

最常见触发点有:

  1. 代码或环境中已经初始化了 torch.distributedaccelerate 的分布式训练(哪怕物理上只有一块卡,也可能因为分布式初始化成功而被认为是多卡)。
  2. 使用了 vLLM 配合旧版 transformers/accelerate,可能会错误检测 GPU 数量。
  3. 某些环境变量(如 MASTER_ADDR, WORLD_SIZE, 等)导致分布式训练被激活。

主要修正思路:

  1. 确保 环境里没有被 torch.distributed/accelerate 初始化;
  2. 如果您确实只想在单卡上跑,请去掉所有分布式启动脚本写法,例如 torchrun --nproc_per_node=...accelerate launch ...
  3. 尝试在第二阶段(或者干脆两阶段都)把 use_vllm=False,彻底使用普通单卡推理,避免 vLLM 自动检测多卡;
  4. 如果依旧报错,可以手动在代码最开头(import 之前)强行设定 os.environ["CUDA_VISIBLE_DEVICES"] = "0",并在正式开始训练前打印一下 torch.cuda.device_count() 以验证只剩 1。

下面给您一个推荐的最小改动示例,将第二阶段的 use_vllm 改成 False,并确保批大小和 num_generations 不会再让库去检查“多卡”:

python
import os os.environ["CUDA_VISIBLE_DEVICES"] = "0" import torch # 在开始前检查一下当前可见的 GPU 数量,如果不是1,就要去掉其他相关环境/进程 print("Detected GPU count:", torch.cuda.device_count()) # ------------------------------------- # Phase 1: Math Training (不变,如果没报错就无需改) training_args_math = GRPOConfig( use_vllm=True, # 其他超参... ) trainer_math = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=math_reward_funcs, args=training_args_math, train_dataset=gsm8k_dataset, ) trainer_math.train() # ------------------------------------- # Phase 2: Lyrics Training (关键改动) training_args_lyrics = GRPOConfig( # 把 use_vllm 改成 False 以避免多卡冲突 use_vllm=False, # 如果您的 per_device_train_batch_size < num_generations,会被 unsloth 自动改到一致; # 这里二者只要一致或 batch_size >= num_generations 即可,防止自动改完又被判断多卡。 per_device_train_batch_size=8, num_generations=8, # 其余超参... learning_rate=5e-6, # ... output_dir = "outputs_lyrics_phase", ) trainer_lyrics = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=lyrics_reward_funcs, args=training_args_lyrics, train_dataset=train_dataset_lyrics, ) trainer_lyrics.train() # 不再报多GPU错误 evaluate_model(trainer_lyrics, test_dataset_lyrics, tokenizer=tokenizer)

如果您连第一阶段也遇到类似问题,可以在两个阶段都改成 use_vllm=False。另外,也要检查是否有用 accelerate launch 或其他分布式启动方式,如:

bash
torchrun --nproc_per_node=2 ...

都要改成最普通的单进程执行(比如 python your_script.py)即可避免触发多卡模式。


补充排查点

  1. 强制只认1卡

    python
    import os os.environ["CUDA_VISIBLE_DEVICES"] = "0" import torch print("Detected GPU count:", torch.cuda.device_count())

    如仍然打印出不止1,说明环境/集群某处有额外 GPU 显示或启动了分布式。

  2. 确保没有分布式或 accelerate

    • 不要用 torchrunaccelerate launch 之类的命令启动脚本;
    • 在脚本最开头可加:
      python
      import torch.distributed as dist if dist.is_initialized(): dist.destroy_process_group()
      以防某些地方自动做了 dist.init_process_group()
  3. vLLM
    部分 vLLM 的早期版本在检测 GPU 设备数量时有 Bug,会错误识别多 GPU。若确有需要 vLLM 并保持单卡,请升级到较新的 vLLM / transformers,并保证单卡环境变量干净。

照此修改后,Unsloth 在单卡下通常就不会再抛出「multi GPU」的报错了。祝您一切顺利,成功完成训练!

Diesen Q&A teilen