RLHF微调
什么是RLHF?
RLHF(Reinforcement Learning from Human Feedback)是一种通过人类反馈进行强化学习的方法,通过三个阶段的训练来使大语言模型更好地符合人类偏好和价值观。
核心架构
三阶段训练流程
阶段1:监督微调(SFT) → 阶段2:奖励模型训练(RM) → 阶段3:强化学习优化(PPO)
整体架构图
人类标注数据 → SFT模型 → 偏好数据收集 → 奖励模型 → PPO优化 → 最终模型
↓ ↓ ↓ ↓ ↓
指令-回答对 基础能力 偏好对比 奖励信号 策略优化
- 监督微调SFT (Stage 1):
- 目标:训练一个初始的策略模型,使其具备基本的指令理解和响应能力。
- 方法:使用高质量的”指令-回答”对进行监督学习。
- 奖励模型训练 (Stage 2):
- 目标:训练一个奖励模型(RM),使其能够评估模型输出的质量,反映人类偏好。
- 方法:收集人类对模型不同输出的偏好排序数据,训练RM进行评分。
- 强化学习优化 (Stage 3):
- 目标:使用奖励模型作为信号,通过强化学习算法(通常是PPO微调)优化第一阶段的SFT模型。
- 方法:策略模型生成回答,奖励模型给出评分,PPO算法根据奖励调整策略模型参数。
阶段一:监督微调(SFT)
SFT目标
建立模型的基础指令跟随能力,使其能够理解和执行各种任务。
实现方法
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
def stage1_sft_training(model_name, sft_dataset):
"""阶段1:监督微调训练"""
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# 数据预处理
def preprocess_sft_data(examples):
"""预处理SFT数据"""
inputs = []
for instruction, output in zip(examples["instruction"], examples["output"]):
text = f"### 指令:\n{instruction}\n\n### 回答:\n{output}{tokenizer.eos_token}"
inputs.append(text)
# 分词
model_inputs = tokenizer(
inputs,
truncation=True,
padding=True,
max_length=512,
return_tensors="pt"
)
# 设置labels
model_inputs["labels"] = model_inputs["input_ids"].clone()
return model_inputs
# 处理数据集
tokenized_dataset = sft_dataset.map(
preprocess_sft_data,
batched=True,
remove_columns=sft_dataset.column_names
)
# 训练配置
training_args = TrainingArguments(
output_dir="./sft_output",
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=4,
learning_rate=2e-5,
warmup_ratio=0.1,
logging_steps=100,
save_strategy="epoch",
evaluation_strategy="epoch",
load_best_model_at_end=True,
)
# 训练器
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
)
# 开始训练
trainer.train()
trainer.save_model()
return model
阶段二:奖励模型训练(RM)
奖励模型架构
import torch
import torch.nn as nn
from transformers import AutoModel
class RewardModel(nn.Module):
def __init__(self, base_model_name, hidden_size=768):
super().__init__()
# 基础语言模型
self.base_model = AutoModel.from_pretrained(base_model_name)
# 奖励头
self.reward_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size, 1) # 输出单个奖励分数
)
# 冻结基础模型的部分层(可选)
for param in self.base_model.embeddings.parameters():
param.requires_grad = False
def forward(self, input_ids, attention_mask=None):
# 获取基础模型输出
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# 使用最后一个token的表示(通常是EOS token)
last_hidden_state = outputs.last_hidden_state
# 找到每个序列的最后一个非padding token
if attention_mask is not None:
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = last_hidden_state.size(0)
last_token_hidden = last_hidden_state[
torch.arange(batch_size), sequence_lengths
]
else:
last_token_hidden = last_hidden_state[:, -1]
# 计算奖励分数
reward = self.reward_head(last_token_hidden)
return reward.squeeze(-1) # [batch_size]
奖励模型训练
def stage2_reward_model_training(sft_model, preference_dataset):
"""阶段2:奖励模型训练"""
# 创建奖励模型
reward_model = RewardModel(sft_model.config.name_or_path)
# 偏好数据预处理
def preprocess_preference_data(examples):
"""预处理偏好数据"""
chosen_texts = []
rejected_texts = []
for prompt, chosen, rejected in zip(
examples["prompt"], examples["chosen"], examples["rejected"]
):
chosen_text = f"{prompt}\n{chosen}"
rejected_text = f"{prompt}\n{rejected}"
chosen_texts.append(chosen_text)
rejected_texts.append(rejected_text)
# 分词
chosen_inputs = tokenizer(
chosen_texts,
truncation=True,
padding=True,
max_length=512,
return_tensors="pt"
)
rejected_inputs = tokenizer(
rejected_texts,
truncation=True,
padding=True,
max_length=512,
return_tensors="pt"
)
return {
"chosen_input_ids": chosen_inputs["input_ids"],
"chosen_attention_mask": chosen_inputs["attention_mask"],
"rejected_input_ids": rejected_inputs["input_ids"],
"rejected_attention_mask": rejected_inputs["attention_mask"],
}
# 奖励模型损失函数
def reward_loss(chosen_rewards, rejected_rewards):
"""计算奖励模型损失(偏好学习)"""
# 使用sigmoid损失,确保chosen的奖励高于rejected
loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
# 计算准确率
accuracy = (chosen_rewards > rejected_rewards).float().mean()
return loss, accuracy
# 训练循环
optimizer = torch.optim.AdamW(reward_model.parameters(), lr=1e-5)
for epoch in range(3):
total_loss = 0
total_accuracy = 0
for batch in preference_dataloader:
# 前向传播
chosen_rewards = reward_model(
batch["chosen_input_ids"],
batch["chosen_attention_mask"]
)
rejected_rewards = reward_model(
batch["rejected_input_ids"],
batch["rejected_attention_mask"]
)
# 计算损失
loss, accuracy = reward_loss(chosen_rewards, rejected_rewards)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
total_accuracy += accuracy.item()
print(f"Epoch {epoch}: Loss={total_loss:.4f}, Accuracy={total_accuracy:.4f}")
return reward_model
阶段三:PPO优化
PPO算法实现
import torch.nn.functional as F
from torch.distributions import Categorical
class PPOTrainer:
def __init__(self, policy_model, reward_model, ref_model,
clip_ratio=0.2, kl_coef=0.1, value_coef=1.0):
self.policy_model = policy_model
self.reward_model = reward_model
self.ref_model = ref_model
self.clip_ratio = clip_ratio
self.kl_coef = kl_coef
self.value_coef = value_coef
# 冻结参考模型和奖励模型
for param in self.ref_model.parameters():
param.requires_grad = False
for param in self.reward_model.parameters():
param.requires_grad = False
self.optimizer = torch.optim.AdamW(
self.policy_model.parameters(),
lr=1e-6 # PPO使用很小的学习率
)
def generate_responses(self, prompts, max_length=256):
"""生成回答"""
self.policy_model.eval()
responses = []
log_probs = []
for prompt in prompts:
# 编码prompt
inputs = tokenizer(prompt, return_tensors="pt")
# 生成回答
with torch.no_grad():
outputs = self.policy_model.generate(
**inputs,
max_length=max_length,
do_sample=True,
temperature=0.7,
pad_token_id=tokenizer.eos_token_id,
return_dict_in_generate=True,
output_scores=True
)
generated_ids = outputs.sequences[0][inputs["input_ids"].size(1):]
response = tokenizer.decode(generated_ids, skip_special_tokens=True)
# 计算log概率
logits = torch.stack(outputs.scores, dim=1)
log_prob = F.log_softmax(logits, dim=-1)
token_log_probs = torch.gather(
log_prob, -1, generated_ids.unsqueeze(0).unsqueeze(-1)
).squeeze(-1)
responses.append(response)
log_probs.append(token_log_probs.sum().item())
return responses, log_probs
def compute_rewards(self, prompts, responses):
"""计算奖励"""
rewards = []
for prompt, response in zip(prompts, responses):
full_text = f"{prompt}\n{response}"
inputs = tokenizer(full_text, return_tensors="pt")
with torch.no_grad():
reward = self.reward_model(**inputs)
rewards.append(reward.item())
return rewards
def compute_kl_penalty(self, prompts, responses):
"""计算KL散度惩罚"""
kl_penalties = []
for prompt, response in zip(prompts, responses):
full_text = f"{prompt}\n{response}"
inputs = tokenizer(full_text, return_tensors="pt")
# 策略模型的logits
policy_outputs = self.policy_model(**inputs)
policy_logits = policy_outputs.logits
# 参考模型的logits
with torch.no_grad():
ref_outputs = self.ref_model(**inputs)
ref_logits = ref_outputs.logits
# 计算KL散度
policy_probs = F.softmax(policy_logits, dim=-1)
ref_probs = F.softmax(ref_logits, dim=-1)
kl_div = F.kl_div(
F.log_softmax(policy_logits, dim=-1),
ref_probs,
reduction='batchmean'
)
kl_penalties.append(kl_div.item())
return kl_penalties
def ppo_step(self, prompts, responses, old_log_probs, rewards):
"""PPO优化步骤"""
self.policy_model.train()
total_loss = 0
for prompt, response, old_log_prob, reward in zip(
prompts, responses, old_log_probs, rewards
):
full_text = f"{prompt}\n{response}"
inputs = tokenizer(full_text, return_tensors="pt")
# 当前策略的log概率
outputs = self.policy_model(**inputs)
logits = outputs.logits
# 计算当前log概率
response_tokens = tokenizer(response, return_tensors="pt")["input_ids"]
log_probs = F.log_softmax(logits, dim=-1)
current_log_prob = torch.gather(
log_probs, -1, response_tokens.unsqueeze(-1)
).sum()
# 计算比率
ratio = torch.exp(current_log_prob - old_log_prob)
# PPO损失
advantage = reward # 简化版,实际应该减去baseline
# 裁剪目标
clipped_ratio = torch.clamp(
ratio, 1 - self.clip_ratio, 1 + self.clip_ratio
)
policy_loss = -torch.min(
ratio * advantage,
clipped_ratio * advantage
)
total_loss += policy_loss
# 反向传播
self.optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
self.optimizer.step()
return total_loss.item()
def train_step(self, prompts):
"""完整的PPO训练步骤"""
# 1. 生成回答
responses, old_log_probs = self.generate_responses(prompts)
# 2. 计算奖励
rewards = self.compute_rewards(prompts, responses)
# 3. 计算KL惩罚
kl_penalties = self.compute_kl_penalty(prompts, responses)
# 4. 调整奖励(加入KL惩罚)
adjusted_rewards = [
r - self.kl_coef * kl for r, kl in zip(rewards, kl_penalties)
]
# 5. PPO优化
loss = self.ppo_step(prompts, responses, old_log_probs, adjusted_rewards)
return {
"loss": loss,
"mean_reward": sum(rewards) / len(rewards),
"mean_kl": sum(kl_penalties) / len(kl_penalties)
}
完整RLHF训练流程
def full_rlhf_training(base_model_name, sft_data, preference_data, prompts):
"""完整的RLHF训练流程"""
print("阶段1:监督微调(SFT)")
sft_model = stage1_sft_training(base_model_name, sft_data)
print("阶段2:奖励模型训练(RM)")
reward_model = stage2_reward_model_training(sft_model, preference_data)
print("阶段3:PPO优化")
# 创建参考模型(SFT模型的副本)
ref_model = AutoModelForCausalLM.from_pretrained(sft_model.config.name_or_path)
ref_model.load_state_dict(sft_model.state_dict())
# 创建PPO训练器
ppo_trainer = PPOTrainer(
policy_model=sft_model,
reward_model=reward_model,
ref_model=ref_model
)
# PPO训练循环
for epoch in range(10):
metrics = ppo_trainer.train_step(prompts)
print(f"PPO Epoch {epoch}: {metrics}")
return sft_model
评估与监控
RLHF评估指标
def evaluate_rlhf_model(model, eval_prompts, human_preferences):
"""评估RLHF模型"""
metrics = {
"helpfulness": 0,
"harmlessness": 0,
"honesty": 0,
"preference_alignment": 0,
"diversity": 0
}
responses = []
for prompt in eval_prompts:
# 生成回答
inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(
**inputs,
max_length=256,
do_sample=True,
temperature=0.7
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
responses.append(response)
# 计算各项指标
metrics["helpfulness"] = evaluate_helpfulness(eval_prompts, responses)
metrics["harmlessness"] = evaluate_harmlessness(responses)
metrics["honesty"] = evaluate_honesty(responses)
metrics["preference_alignment"] = evaluate_preference_alignment(
responses, human_preferences
)
metrics["diversity"] = evaluate_diversity(responses)
return metrics
def evaluate_helpfulness(prompts, responses):
"""评估有用性"""
# 可以使用另一个模型或人工评估
# 这里简化为长度和相关性的启发式评估
scores = []
for prompt, response in zip(prompts, responses):
# 简单的启发式评分
length_score = min(len(response.split()) / 50, 1.0) # 长度适中
relevance_score = len(set(prompt.lower().split()) &
set(response.lower().split())) / len(prompt.split())
score = (length_score + relevance_score) / 2
scores.append(score)
return sum(scores) / len(scores)
训练监控
class RLHFMonitor:
def __init__(self):
self.metrics_history = {
"reward": [],
"kl_divergence": [],
"policy_loss": [],
"value_loss": []
}
def log_metrics(self, metrics):
"""记录训练指标"""
for key, value in metrics.items():
if key in self.metrics_history:
self.metrics_history[key].append(value)
def check_convergence(self, window=10):
"""检查训练收敛性"""
if len(self.metrics_history["reward"]) < window:
return False
recent_rewards = self.metrics_history["reward"][-window:]
reward_std = torch.std(torch.tensor(recent_rewards))
# 如果奖励标准差很小,认为已收敛
return reward_std < 0.01
def detect_reward_hacking(self, threshold=2.0):
"""检测奖励黑客攻击"""
if len(self.metrics_history["reward"]) < 2:
return False
current_reward = self.metrics_history["reward"][-1]
previous_reward = self.metrics_history["reward"][-2]
# 如果奖励增长过快,可能存在奖励黑客
return (current_reward - previous_reward) > threshold
优势与挑战
优势
- 人类对齐:直接优化人类偏好
- 安全性提升:减少有害输出
- 质量改善:提高回答质量和相关性
- 可控性:通过奖励函数控制行为
- 泛化能力:学到的偏好可以泛化到新任务
挑战
- 训练复杂性:三阶段训练流程复杂
- 数据需求:需要大量高质量的人类标注
- 奖励黑客:模型可能学会欺骗奖励函数
- 分布偏移:可能偏离原始模型分布
- 计算成本:训练成本高昂
最佳实践
数据收集策略
def rlhf_data_collection_guide():
"""RLHF数据收集指南"""
return {
"SFT数据": {
"数量": "10K-100K高质量指令-回答对",
"质量": "确保回答准确、有用、安全",
"多样性": "覆盖各种任务类型和领域",
"格式": "统一的指令格式"
},
"偏好数据": {
"数量": "10K-50K偏好对比",
"标注质量": "多人标注,一致性检查",
"平衡性": "避免偏向特定类型的回答",
"更新频率": "定期更新以反映最新偏好"
},
"评估数据": {
"独立性": "与训练数据完全独立",
"全面性": "覆盖所有重要维度",
"实时性": "反映真实使用场景"
}
}
超参数调优
def rlhf_hyperparameter_guide():
"""RLHF超参数调优指南"""
return {
"SFT阶段": {
"学习率": "2e-5",
"批次大小": "4-8",
"训练轮数": "3-5"
},
"RM阶段": {
"学习率": "1e-5",
"批次大小": "2-4",
"训练轮数": "1-3"
},
"PPO阶段": {
"学习率": "1e-6",
"KL系数": "0.1",
"裁剪比率": "0.2",
"批次大小": "1-2"
}
}