加载中...
加载中...
回答: 注意力机制的核心公式如下:
Scaled Dot-Product Attention:
Attention(Q, K, V) = softmax(QK^T / √d_k)V
其中:
Multi-Head Attention:
MultiHead(Q, K, V) = Concat(head_1, ..., head_h)W^O
where head_i = Attention(QW_i^Q, KW_i^K, VW_i^V)
代码实现:
import torch
import torch.nn.functional as F
class ScaledDotProductAttention(torch.nn.Module):
def __init__(self, d_k, dropout=0.1):
super().__init__()
self.d_k = d_k
self.dropout = torch.nn.Dropout(dropout)
def forward(self, Q, K, V, mask=None):
# 计算注意力分数
scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))
# 应用mask(可选)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# Softmax归一化
attention_weights = F.softmax(scores, dim=-1)
attention_weights = self.dropout(attention_weights)
# 加权求和
output = torch.matmul(attention_weights, V)
return output, attention_weights
class MultiHeadAttention(torch.nn.Module):
def __init__(self, d_model, num_heads, dropout=0.1):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.d_v = d_model // num_heads
self.W_q = torch.nn.Linear(d_model, d_model)
self.W_k = torch.nn.Linear(d_model, d_model)
self.W_v = torch.nn.Linear(d_model, d_model)
self.W_o = torch.nn.Linear(d_model, d_model)
self.attention = ScaledDotProductAttention(self.d_k, dropout)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# 线性变换
Q = self.W_q(query) # [batch_size, seq_len, d_model]
K = self.W_k(key)
V = self.W_v(value)
# 重塑为多头
Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
V = V.view(batch_size, -1, self.num_heads, self.d_v).transpose(1, 2)
# 应用注意力
attention_output, attention_weights = self.attention(Q, K, V, mask)
# 拼接多头
attention_output = attention_output.transpose(1, 2).contiguous().view(
batch_size, -1, self.d_model
)
# 输出投影
output = self.W_o(attention_output)
return output, attention_weights
回答: 完整的Transformer架构实现:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, mask=None):
# Self-attention with residual connection
attn_output, _ = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout1(attn_output))
# Feed-forward with residual connection
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout2(ff_output))
return x
class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
def forward(self, x, enc_output, self_mask=None, cross_mask=None):
# Self-attention
attn_output, _ = self.self_attn(x, x, x, self_mask)
x = self.norm1(x + self.dropout1(attn_output))
# Cross-attention
attn_output, _ = self.cross_attn(x, enc_output, enc_output, cross_mask)
x = self.norm2(x + self.dropout2(attn_output))
# Feed-forward
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout3(ff_output))
return x
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8,
num_layers=6, d_ff=2048, dropout=0.1):
super().__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model)
self.encoder_layers = nn.ModuleList([
TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.decoder_layers = nn.ModuleList([
TransformerDecoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.fc_out = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
self.d_model = d_model
def create_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = torch.ones(1, seq_length, seq_length).triu(diagonal=1)
tgt_mask = tgt_mask & (nopeak_mask == 0).unsqueeze(0)
return src_mask, tgt_mask
def forward(self, src, tgt):
batch_size, tgt_len = tgt.size()
# Create masks
src_mask, tgt_mask = self.create_mask(src, tgt)
# Embedding and positional encoding
src = self.dropout(self.pos_encoding(self.encoder_embedding(src) * math.sqrt(self.d_model)))
tgt = self.dropout(self.pos_encoding(self.decoder_embedding(tgt) * math.sqrt(self.d_model)))
# Encoder
enc_output = src
for layer in self.encoder_layers:
enc_output = layer(enc_output, src_mask)
# Decoder
dec_output = tgt
for layer in self.decoder_layers:
dec_output = layer(dec_output, enc_output, tgt_mask, src_mask)
# Output projection
output = self.fc_out(dec_output)
return output
回答: 常见的归一化方式对比:
公式:
def batch_norm(x, gamma, beta, eps=1e-5):
# x: [batch_size, features]
mean = torch.mean(x, dim=0)
var = torch.var(x, dim=0, unbiased=False)
x_norm = (x - mean) / torch.sqrt(var + eps)
return gamma * x_norm + beta
优点:
缺点:
代码实现:
class BatchNorm1d(nn.Module):
def __init__(self, num_features, eps=1e-5, momentum=0.1):
super().__init__()
self.num_features = num_features
self.eps = eps
self.momentum = momentum
# 可学习参数
self.gamma = nn.Parameter(torch.ones(num_features))
self.beta = nn.Parameter(torch.zeros(num_features))
# 运行时统计量
self.register_buffer('running_mean', torch.zeros(num_features))
self.register_buffer('running_var', torch.ones(num_features))
def forward(self, x):
if self.training:
# 训练模式:使用当前batch统计
mean = x.mean(dim=0)
var = x.var(dim=0, unbiased=False)
# 更新运行时统计
with torch.no_grad():
self.running_mean = (1 - self.momentum) * self.running_mean + \
self.momentum * mean
self.running_var = (1 - self.momentum) * self.running_var + \
self.momentum * var
else:
# 推理模式:使用运行时统计
mean = self.running_mean
var = self.running_var
return self.gamma * (x - mean) / torch.sqrt(var + self.eps) + self.beta
公式:
def layer_norm(x, gamma, beta, eps=1e-5):
# x: [batch_size, seq_len, features]
mean = torch.mean(x, dim=-1, keepdim=True)
var = torch.var(x, dim=-1, keepdim=True, unbiased=False)
x_norm = (x - mean) / torch.sqrt(var + eps)
return gamma * x_norm + beta
优点:
缺点:
代码实现:
class LayerNorm(nn.Module):
def __init__(self, d_model, eps=1e-6):
super().__init__()
self.d_model = d_model
self.eps = eps
self.gamma = nn.Parameter(torch.ones(d_model))
self.beta = nn.Parameter(torch.zeros(d_model))
def forward(self, x):
# x: [..., d_model]
mean = x.mean(-1, keepdim=True)
var = x.var(-1, unbiased=False, keepdim=True)
return self.gamma * (x - mean) / torch.sqrt(var + self.eps) + self.beta
公式:
def instance_norm(x, gamma, beta, eps=1e-5):
# x: [batch_size, channels, height, width]
mean = torch.mean(x, dim=[2, 3], keepdim=True)
var = torch.var(x, dim=[2, 3], keepdim=True, unbiased=False)
x_norm = (x - mean) / torch.sqrt(var + eps)
return gamma * x_norm + beta
适用场景:
公式:
def group_norm(x, gamma, beta, groups=32, eps=1e-5):
# x: [batch_size, channels, ...]
batch_size, num_channels = x.shape[:2]
x = x.view(batch_size, groups, num_channels // groups, *x.shape[2:])
mean = x.mean(dim=[2, 3, 4], keepdim=True)
var = x.var(dim=[2, 3, 4], keepdim=True, unbiased=False)
x_norm = (x - mean) / torch.sqrt(var + eps)
x_norm = x_norm.view(batch_size, num_channels, *x.shape[2:])
return gamma * x_norm + beta
优点:
回答: 归一化方式选择指南:
为什么选择LN:
序列处理特性:
# Transformer中的典型LN位置
class TransformerBlock(nn.Module):
def __init__(self, d_model):
super().__init__()
self.norm1 = nn.LayerNorm(d_model) # Pre-LN
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x):
# Pre-LN: 先归一化再注意力
x = self.norm1(x)
attn_out = self.multihead_attn(x)
x = x + attn_out
x = self.norm2(x)
ff_out = self.feedforward(x)
x = x + ff_out
return x
训练稳定性:
位置无关性:
为什么选择BN:
卷积特性匹配:
# CNN中的典型BN使用
class ConvBlock(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.conv = nn.Conv2d(in_channels, out_channels, 3, padding=1)
self.bn = nn.BatchNorm2d(out_channels) # 对channel归一化
self.relu = nn.ReLU()
def forward(self, x):
x = self.conv(x)
x = self.bn(x) # 在激活函数前
x = self.relu(x)
return x
特征分布稳定:
Vision-Language-Action模型的特殊考虑:
class VLAModel(nn.Module):
def __init__(self):
super().__init__()
# Vision branch: 使用BN或GN
self.vision_norm = nn.BatchNorm2d or nn.GroupNorm
# Language branch: 使用LN
self.language_norm = nn.LayerNorm
# Fusion layer: 自适应归一化
self.adaptive_norm = AdaptiveNorm()
class AdaptiveNorm(nn.Module):
"""自适应归一化,根据模态选择不同策略"""
def __init__(self, d_model):
super().__init__()
self.ln = nn.LayerNorm(d_model)
self.bn = nn.BatchNorm1d(d_model)
def forward(self, x, modality):
if modality == 'vision':
return self.bn(x)
elif modality == 'language':
return self.ln(x)
else:
return x
import torch
import torch.nn as nn
# 假设输入: [batch_size=32, seq_len=100, features=512]
x = torch.randn(32, 100, 512)
# Batch Normalization: 在batch维度上统计
# 需要先reshape成2D: [32*100, 512]
bn = nn.BatchNorm1d(512)
x_bn = bn(x.view(-1, 512)).view(32, 100, 512)
# Layer Normalization: 在feature维度上统计
ln = nn.LayerNorm(512)
x_ln = ln(x)
print("BN output shape:", x_bn.shape) # [32, 100, 512]
print("LN output shape:", x_ln.shape) # [32, 100, 512]
def compare_norm_performance():
# 不同batch size下的表现
batch_sizes = [2, 8, 32, 128]
for bs in batch_sizes:
x = torch.randn(bs, 100, 512)
# BN在batch size小时效果差
bn = nn.BatchNorm1d(512, momentum=1.0) # 不使用running stats
bn_out = bn(x.view(-1, 512)).view(bs, 100, 512)
bn_var = torch.var(bn_out.view(-1, 512), dim=0).mean()
# LN不受batch size影响
ln = nn.LayerNorm(512)
ln_out = ln(x)
ln_var = torch.var(ln_out.view(-1, 512), dim=0).mean()
print(f"Batch size {bs}: BN var={bn_var:.4f}, LN var={ln_var:.4f}")
compare_norm_performance()
class NormComparison:
"""比较不同归一化的训练动态"""
def __init__(self):
self.bn_stats = {'mean': [], 'var': []}
self.ln_stats = {'mean': [], 'var': []}
def track_stats(self, x, bn, ln):
# 跟踪BN统计量
self.bn_stats['mean'].append(bn.running_mean.mean().item())
self.bn_stats['var'].append(bn.running_var.mean().item())
# 跟踪LN统计量(每个样本独立)
ln_means = x.mean(dim=-1).mean().item()
ln_vars = x.var(dim=-1).mean().item()
self.ln_stats['mean'].append(ln_means)
self.ln_stats['var'].append(ln_vars)
def choose_normalization(task_type, model_type, batch_size):
"""
根据任务和模型特性选择归一化方式
"""
recommendations = {
('vision', 'cnn', 'large'): 'BatchNorm',
('vision', 'cnn', 'small'): 'GroupNorm',
('language', 'transformer', 'any'): 'LayerNorm',
('multimodal', 'transformer', 'any'): 'LayerNorm',
('rl', 'policy', 'any'): 'LayerNorm',
('generation', 'gan', 'any'): 'InstanceNorm'
}
key = (task_type, model_type, 'large' if batch_size > 32 else 'small')
return recommendations.get(key, 'LayerNorm')
class MixedNormalization(nn.Module):
"""混合归一化策略"""
def __init__(self, d_model, norm_types=['ln', 'bn']):
super().__init__()
self.norms = nn.ModuleDict()
for norm_type in norm_types:
if norm_type == 'ln':
self.norms[norm_type] = nn.LayerNorm(d_model)
elif norm_type == 'bn':
self.norms[norm_type] = nn.BatchNorm1d(d_model)
self.alpha = nn.Parameter(torch.ones(len(norm_types)))
def forward(self, x):
outputs = []
for i, (norm_type, norm_layer) in enumerate(self.norms.items()):
if norm_type == 'bn':
out = norm_layer(x.view(-1, x.size(-1))).view(x.size())
else:
out = norm_layer(x)
outputs.append(out)
# 加权融合
weights = F.softmax(self.alpha, dim=0)
mixed = sum(w * out for w, out in zip(weights, outputs))
return mixed
你是怎么做的? (How)
为什么这么做? (Why)
实现了什么? (What)
你还了解什么做法? (What else)
class End2EndVLA(nn.Module):
"""端到端Vision-Language-Action模型"""
def __init__(self):
super().__init__()
# Vision Encoder
self.vision_encoder = VisionTransformer(
patch_size=16,
embed_dim=768,
num_heads=12
)
# Language Encoder
self.lang_encoder = LanguageTransformer(
vocab_size=50000,
embed_dim=768,
num_heads=12
)
# Cross-modal Fusion
self.fusion_layer = CrossModalFusion(
d_model=768,
num_heads=12
)
# Action Decoder
self.action_decoder = ActionDecoder(
d_model=768,
action_dim=30 # 机器人动作维度
)
def forward(self, images, instructions):
# 视觉特征提取
vision_features = self.vision_encoder(images)
# 语言特征提取
lang_features = self.lang_encoder(instructions)
# 多模态融合
fused_features = self.fusion_layer(
vision_features,
lang_features
)
# 动作生成
actions = self.action_decoder(fused_features)
return actions
class VLAwithRL(nn.Module):
"""VLA与强化学习结合"""
def __init__(self):
super().__init__()
self.vla_model = End2EndVLA()
self.value_head = nn.Linear(768, 1)
self.policy_head = nn.Linear(768, 30)
def forward(self, observations):
# VLA基础推理
actions = self.vla_model(observations)
# RL增强
features = self.get_fused_features(observations)
value = self.value_head(features)
policy_dist = self.policy_head(features)
return actions, value, policy_dist
运控组 + 模型组整合
技术栈统一
# 典型的VLA Demo开发流程
demo_development_pipeline = {
'phase_1': {
'duration': '1-2 weeks',
'tasks': [
'需求分析和场景定义',
'数据收集和预处理',
'模型选型和架构设计'
]
},
'phase_2': {
'duration': '2-3 weeks',
'tasks': [
'核心算法实现',
'仿真环境验证',
'基础功能集成'
]
},
'phase_3': {
'duration': '1-2 weeks',
'tasks': [
'真实机器人部署',
'性能优化和调试',
'演示场景准备'
]
}
}
技术路线相关:
团队文化相关:
众擎机器人的面试注重:
准备时建议:
发表评论
请登录后发表评论
评论 (0)