手记

从零开始使用Python构建一个百万参数的大型语言模型

图片来自GoogleDeepMind(开源资源可在pexels上找到)

制作自己的大型语言模型(LLM)是一件很酷的事情,许多大公司如谷歌、推特和脸书都在做这件事。他们发布了不同版本的这些模型,比如70亿、130亿或700亿参数的模型。甚至较小的社区也在做这件事。你可能读过关于创建自己LLM的博客或观看过相关视频,但它们通常会讲很多理论,而不太涉及实际步骤和代码。

在这篇博客中,我将尝试仅用230万个参数创建一个大语言模型(LLM),有趣的是,我们不需要使用高端的GPU。我们将遵循LLaMA 1 论文的方法作为指导。不用担心,我们会保持简单,使用一个基本的数据集,让你看到创建自己的百万参数大语言模型是多么容易。

来看看我的新博客,在那里我通过手动进行矩阵乘法来解析Transformer架构。我一步一步地讲解,让你可以轻松理解这些概念:

从头到尾理解Transformer —— 一步一步的数学示例我们将使用一个简单的数据集并执行多次矩阵乘法来解决编码器和解码器部分…medium.com
目录

· 前置条件
· 了解 LLaMA 的 Transformer 架构
∘ 使用 RMSNorm 进行预归一化
∘ SwiGLU 激活函数
∘ Rotary Embeddings (RoPE)
· 设置环境
· 数据预处理
· 评估策略
· 构建基础神经网络模型
· 复现 LLaMA 架构
∘ 使用 RMSNorm 进行预归一化
∘ Rotary Embeddings
∘ 使用 SwiGLU 激活函数
· 调整超参数
· 保存您的语言模型 (LLM)
· 结论

前置条件

确保你对面向对象编程(OOP)和神经网络(NN)有基本的了解。熟悉 PyTorch 对于编程也会有所帮助。

理解LLaMA的Transformer架构

在使用 LLaMA 方法创建我们自己的大语言模型之前,了解 LLaMA 的架构是非常重要的。下面是 vanilla transformer 和 LLaMA 之间的对比图。

Transformer与Llama架构之间的区别(Llama架构由Umar Jamil设计)

如果你不熟悉标准的Transformer架构,可以阅读 这篇博客 以获得一个基本的指南。

让我们更详细地了解一下 LLaMA 的一些基本概念:

使用RMSNorm进行预归一化:

在 LLaMA 方法中,使用了一种称为 RMSNorm 的技术来对每个变压器子层的输入进行归一化。这种技术借鉴了 GPT-3 的方法,旨在优化与层归一化相关的计算成本。RMSNorm 提供了与 LayerNorm 相似的性能,但显著减少了运行时间(减少了 7%∼64%)。

均方根层归一化论文 (https://arxiv.org/abs/1910.07467)

它通过强调重缩放不变性和基于均方根(RMS)统计量调节输入的总和来实现这一点。主要动机是通过移除均值统计量来简化LayerNorm。感兴趣的读者可以在这里探索RMSNorm的详细实现:这里

SwiGLU 激活函数:

LLaMA 引入了 SwiGLU 激活函数,借鉴了 PaLM 的设计。要理解 SwiGLU,首先需要了解 Swish 激活函数。SwiGLU 是 Swish 的扩展,涉及一个自定义层,该层使用密集网络将输入激活拆分并相乘。

SwiGLU: GLU 变体改进 Transformer (https://kikaben.com/swiglu-2020/)

目的是通过引入更复杂的激活函数来增强模型的表达能力。有关 SwiGLU 的更多细节可以在相关 论文 中找到。

旋转嵌入(RoPE):

旋转嵌入(RoPE)是 LLaMA 中使用的一种位置嵌入类型。它通过旋转矩阵编码绝对位置信息,并且在自注意力公式中自然地包含显式的相对位置依赖性。RoPE 具有可扩展到各种序列长度以及相对距离增加时相互依赖性衰减等优点。

这是通过与旋转矩阵相乘来编码相对位置实现的,从而导致衰减的相对距离——这对于自然语言编码是一个理想的特点。有兴趣了解数学细节的人可以参考RoPE论文

除了这些概念外,LLaMA论文还介绍了其他重要的方法,包括使用具有特定参数的AdamW优化器,以及xformers库中提供的高效的实现,例如因果多头注意力操作符,还有为变压器层手动实现的反向函数,以优化反向传递中的计算。

特别感谢 Anush Kumar 为 LLaMA 的每个关键方面提供了深入的解释。

设置舞台

我们将在这个项目中使用一系列的Python库,所以让我们导入它们:

    # 使用PyTorch实现LLM(无GPU)
    import torch  

    # 从PyTorch导入神经网络模块和函数
    from torch import nn  
    from torch.nn import functional as F  

    # 使用NumPy进行数值运算
    import numpy as np  

    # 使用Matplotlib进行绘图(例如绘制损失)
    from matplotlib import pyplot as plt  

    # 使用time模块跟踪执行时间
    import time  

    # 使用Pandas进行数据操作和分析
    import pandas as pd  

    # 使用urllib处理URL请求(下载数据集)
    import urllib.request

此外,我正在创建一个配置对象来存储模型参数。

    # 模型参数的配置对象  
    MASTER_CONFIG = {  
        # 后续添加参数  
    }

这种方法保持了灵活性,允许在未来根据需要添加更多参数。

数据预处理

在原始的 LLaMA 论文中,使用了多种开源数据集来训练和评估模型。

https://research.facebook.com/publications/llama开放且高效的基石语言模型/

不幸的是,对于较小的项目来说,使用大规模的数据集可能不太实际。因此,对于我们实现的版本,我们将采取更为谦逊的做法,创建一个大幅精简版的LLaMA。

鉴于无法访问大量数据的限制,我们将专注于使用 TinyShakespeare 数据集训练一个简化的 LLaMA 版本。这个开源数据集可以在 这里 获取,包含大约 40,000 行来自各种莎士比亚作品的文本。这一选择受到 Karpathy 的 Makemore 系列 的影响,该系列提供了有关训练语言模型的宝贵见解。

虽然 LLaMA 是在包含 1.4万亿 令牌的大型数据集上训练的,而我们的数据集 TinyShakespeare 只包含大约 1百万字符

首先,我们通过下载来获取数据集:

    # GitHub 上的原始文本文件的 URL  
    url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"  

    # 本地存储的文件名  
    file_name = "tinyshakespeare.txt"  

    # 执行下载  
    urllib.request.urlretrieve(url, file_name)

这个 Python 脚本从指定的 URL 获取 tinyshakespeare 数据集,并将其本地保存为文件名 “tinyshakespeare.txt”

接下来,让我们确定词汇表的大小,这代表了我们数据集中唯一字符的数量。以下是代码片段:

    # 读取数据集的内容  
    lines = open("tinyshakespeare.txt", 'r').read()  

    # 创建数据集中唯一字符的排序列表  
    vocab = sorted(list(set(lines)))  

    # 显示词汇表列表中的前10个字符  
    print('打印词汇表列表中的前10个字符:', vocab[:10])  

    # 输出我们数据集中的总字符数(词汇表大小)  
    print('我们数据集中的总字符数(词汇表大小):', len(vocab))

现在,我们正在创建整数到字符的映射(itos)和字符到整数的映射(stoi)。以下是代码:

    # 整数到字符的映射(itos)  
    itos = {i: ch for i, ch in enumerate(vocab)}  

    # 字符到整数的映射(stoi)  
    stoi = {ch: i for i, ch in enumerate(vocab)}

在原始的 LLaMA 论文中,使用了 Google 的 SentencePiece byte-pair encoding tokenizer。然而,为了简化,我们将选择一个基本的字符级分词器。让我们创建编码和解码函数,稍后我们将这些函数应用于我们的数据集:

    # 编码函数:使用映射 stoi 将字符串转换为整数列表
    def encode(s):  
        return [stoi[ch] for ch in s]  

    # 解码函数:使用映射 itos 将整数列表转换回字符串
    def decode(l):  
        return ''.join([itos[i] for i in l])  

    # 示例:将字符串 "morning" 编码,然后解码结果
    decode(encode("morning"))

最后一行将输出 morning,确认了编码和解码函数的功能正确。

我们现在将数据集转换为 torch 张量,并使用 PyTorch 指定其数据类型以进行进一步的操作:

    # 将数据集转换为指定数据类型的 torch 张量  
    dataset = torch.tensor(encode(lines), dtype=torch.int8)  

    # 显示生成的张量的形状  
    print(dataset.shape)

输出是torch.Size([1115394]),表明我们的数据集包含大约一百万个token。值得注意的是,这比LLaMA数据集要小得多,LLaMA数据集包含1.4万亿个token

我们将创建一个负责将数据集拆分为训练集、验证集或测试集的函数。在机器学习或深度学习项目中,这样的拆分对于开发和评估模型至关重要,同样的原则也适用于这里复制大型语言模型(LLM)的方法:

    # 获取训练、验证或测试批次的函数  
    def get_batches(data, split, batch_size, context_window, config=MASTER_CONFIG):  
        # 将数据集拆分为训练集、验证集和测试集  
        train = data[:int(.8 * len(data))]  
        val = data[int(.8 * len(data)): int(.9 * len(data))]  
        test = data[int(.9 * len(data)):]  

        # 确定使用哪个拆分  
        batch_data = train  
        if split == 'val':  
            batch_data = val  
        if split == 'test':  
            batch_data = test  

        # 在数据中选择随机起始点  
        ix = torch.randint(0, batch_data.size(0) - context_window - 1, (batch_size,))  

        # 创建输入序列(x)和对应的输出序列(y)  
        x = torch.stack([batch_data[i:i+context_window] for i in ix]).long()  
        y = torch.stack([batch_data[i+1:i+context_window+1] for i in ix]).long()  

        return x, y

现在我们的拆分函数已经定义好了,让我们设置两个对于这个过程至关重要的参数:

    # 使用 batch_size 和 context_window 参数更新 MASTER_CONFIG  
    MASTER_CONFIG.update({  
        'batch_size': 8,          # 每次随机拆分时要处理的批次数量  
        'context_window': 16      # 每个批次中每个输入(x)和目标(y)序列的字符数  
    })

batch_size 确定了在每次随机分割时处理的批次数量,而 context_window 则指定了每个批次中每个输入 (x) 和目标 (y) 序列的字符数量。

让我们从数据集中打印出训练集第8个批次和上下文窗口为16的随机样本:

    # 使用指定的批次大小和上下文窗口获取训练批次  
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 解码序列以获取相应的文本表示  
    decoded_samples = [(decode(xs[i].tolist()), decode(ys[i].tolist())) for i in range(len(xs))]  

    # 打印随机样本  
    print(decoded_samples)

策略评估

现在,我们将创建一个专门用于评估我们自创的LLaMA架构的函数。在定义实际模型方法之前进行此操作的原因是为了在训练过程中能够持续进行评估。

    @torch.no_grad()  # 不为此函数计算梯度  
    def evaluate_loss(model, config=MASTER_CONFIG):  
        # 用于存储评估结果的占位符  
        out = {}  

        # 将模型设置为评估模式  
        model.eval()  

        # 遍历训练和验证数据集  
        for split in ["train", "val"]:  
            # 用于存储单个损失的占位符  
            losses = []  

            # 生成10个批次进行评估  
            for _ in range(10):  
                # 获取输入序列(xb)和目标序列(yb)  
                xb, yb = get_batches(dataset, split, config['batch_size'], config['context_window'])  

                # 进行模型推理并计算损失  
                _, loss = model(xb, yb)  

                # 将损失添加到列表中  
                losses.append(loss.item())  

            # 计算该数据集的平均损失并将其存储在输出字典中  
            out[split] = np.mean(losses)  

        # 将模型重新设置为训练模式  
        model.train()  

        return out

我们在训练迭代过程中使用 损失 作为评估模型性能的指标。我们的函数会遍历训练集和验证集,计算每个数据集的10个批次的平均损失,最后返回结果。然后将模型重新设置为训练模式,使用 model.train()

设置基础神经网络模型

我们正在构建一个基本的神经网络,稍后将使用 LLaMA 技术对其进行改进。

    # 定义一个基本的神经网络类  
    class SimpleBrokenModel(nn.Module):  
        def __init__(self, config=MASTER_CONFIG):  
            super().__init__()  
            self.config = config  

            # 嵌入层,将字符索引转换为向量(词汇表大小:65)  
            self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])  

            # 线性层,用于建模特征之间的关系  
            # (将用 LLaMA 中的 SwiGLU 激活函数更新)  
            self.linear = nn.Sequential(  
                nn.Linear(config['d_model'], config['d_model']),  
                nn.ReLU(),  # 当前使用 ReLU,将被 LLaMA 中的 SwiGLU 替换  
                nn.Linear(config['d_model'], config['vocab_size']),  
            )  

            # 打印模型参数总数  
            print("模型参数总数:", sum([m.numel() for m in self.parameters()]))

在当前架构中,嵌入层的词汇量为65,代表了我们数据集中的字符。由于这作为我们的基础模型,我们在线性层中使用了ReLU作为激活函数;然而,这之后将会被替换为SwiGLU,就像在LLaMA中使用的那样。

为了为我们的基础模型创建前向传递,我们必须在我们的NN模型中定义一个前向函数。

    # 定义一个基本的神经网络类  
    class SimpleBrokenModel(nn.Module):  
        def __init__(self, config=MASTER_CONFIG):  

            # 其余代码          
            ...   

            # 基础模型的前向传递函数  
            def forward(self, idx, targets=None):  
                # 嵌入层将字符索引转换为向量  
                x = self.embedding(idx)  

                # 线性层用于建模特征之间的关系  
                a = self.linear(x)  

                # 应用softmax激活以获得概率分布  
                logits = F.softmax(a, dim=-1)  

                # 如果提供了目标,则计算并返回交叉熵损失  
                if targets is not None:  
                    # 重塑logits和目标以进行交叉熵计算  
                    loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))  
                    return logits, loss  

                # 如果没有提供目标,则返回logits  
                else:  
                    return logits  

            # 打印模型参数总数  
            print("模型参数总数:", sum([m.numel() for m in self.parameters()]))

这个前向传播函数接受字符索引(idx)作为输入,应用嵌入层,将结果通过线性层传递,然后应用softmax激活函数以获得概率分布(logits)。如果提供了目标(targets),它会计算交叉熵损失,并返回logits和损失。如果没有提供目标,它只返回logits

为了实例化这个模型,我们可以直接调用该类并打印我们简单神经网络模型中的总参数数量。我们将线性层的维度设置为128,并在配置对象中指定了这个值:

    # 使用128更新MASTER_CONFIG中的线性层维度  
    MASTER_CONFIG.update({  
        'd_model': 128,  
    })  

    # 使用更新后的MASTER_CONFIG实例化SimpleBrokenModel  
    model = SimpleBrokenModel(MASTER_CONFIG)  

    # 打印模型中的总参数数量  
    print("简单神经网络模型中的总参数数量:", sum([m.numel() for m in model.parameters()]))

我们的简单神经网络模型包含大约33,000个参数。

同样地,为了计算logits和损失,我们只需要将分割后的数据集输入到模型中:

    # 使用指定的批次大小和上下文窗口获取训练批次
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])

    # 使用模型计算logits和损失
    logits, loss = model(xs, ys)

为了训练我们的基础模型并记录其性能,我们需要指定一些参数。我们将总共训练1000个周期。将批量大小从8增加到32,并将log_interval设置为10,这意味着代码将在每次训练的每10个批次时打印或记录训练进度的信息。对于优化,我们将使用Adam优化器。

    # 用训练参数更新MASTER_CONFIG  
    MASTER_CONFIG.update({  
        'epochs': 1000,          # 训练周期数  
        'log_interval': 10,      # 训练期间每10个批次记录一次信息  
        'batch_size': 32,        # 将批量大小增加到32  
    })  

    # 使用更新的配置实例化SimpleBrokenModel  
    model = SimpleBrokenModel(MASTER_CONFIG)  

    # 为模型参数定义Adam优化器  
    optimizer = torch.optim.Adam(  
        model.parameters(),      # 将模型参数传递给优化器  
    )

让我们执行训练过程并捕获基础模型的损失,包括参数总数。此外,每行都有注释以增强清晰度

    # 执行训练函数  
    def train(model, optimizer, scheduler=None, config=MASTER_CONFIG, print_logs=False):  
        # 用于存储损失的占位符  
        losses = []  

        # 开始记录时间  
        start_time = time.time()  

        # 遍历每个epoch  
        for epoch in range(config['epochs']):  
            # 将梯度清零  
            optimizer.zero_grad()  

            # 获取训练批次  
            xs, ys = get_batches(dataset, 'train', config['batch_size'], config['context_window'])  

            # 模型前向传播以计算logits和损失  
            logits, loss = model(xs, targets=ys)  

            # 反向传播和优化步骤  
            loss.backward()  
            optimizer.step()  

            # 如果提供了学习率调度器,则调整学习率  
            if scheduler:  
                scheduler.step()  

            # 每隔指定间隔记录进度  
            if epoch % config['log_interval'] == 0:  
                # 计算批次时间  
                batch_time = time.time() - start_time  

                # 在验证集上评估损失  
                x = evaluate_loss(model)  

                # 存储验证损失  
                losses += [x]  

                # 如果指定了打印日志,则打印进度日志  
                if print_logs:  
                    print(f"Epoch {epoch} | val loss {x['val']:.3f} | Time {batch_time:.3f} | ETA in seconds {batch_time * (config['epochs'] - epoch)/config['log_interval'] :.3f}")  

                # 重置计时器  
                start_time = time.time()  

                # 如果提供了调度器,则打印学习率  
                if scheduler:  
                    print("lr: ", scheduler.get_lr())  

        # 打印最终验证损失  
        print("Validation loss: ", losses[-1]['val'])  

        # 绘制训练和验证损失曲线  
        return pd.DataFrame(losses).plot()  

    # 执行训练过程  
    train(model, optimizer)

初始的交叉熵损失在训练前为4.17,经过1000个周期后降至3.93。在这个上下文中,交叉熵反映了选择错误单词的可能性。

我们的模型包含了一个softmax层,它将一组数字转换为概率分布。让我们使用内置的F.cross_entropy函数,这时我们需要直接传入未归一化的logits。因此,我们将相应地修改我们的模型。

    # 修改后的SimpleModel类,移除了softmax层  
    class SimpleModel(nn.Module):  
        def __init__(self, config):  

           # 其余代码  
           ...  

        def forward(self, idx, targets=None):  
            # 嵌入层将字符索引转换为向量  
            x = self.embedding(idx)  

            # 线性层用于建模特征之间的关系  
            logits = self.linear(x)  

            # 如果提供了目标值,则计算并返回交叉熵损失  
            if targets is not None:  

                # 其余代码  
                ...

让我们重新创建更新后的SimpleModel,并训练1000个周期以观察任何变化:

    # 创建更新后的SimpleModel  
    model = SimpleModel(MASTER_CONFIG)  

    # 获取训练批次  
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 使用模型计算logits和损失  
    logits, loss = model(xs, ys)  

    # 为模型参数定义Adam优化器  
    optimizer = torch.optim.Adam(model.parameters())  

    # 训练模型1000个周期  
    train(model, optimizer)

在将损失降低到2.51之后,让我们探索一下我们的语言模型在大约有33,000个参数的情况下,如何在推理过程中生成文本。我们将创建一个‘generate’函数,稍后在复制LLaMA时会使用到:

    # 使用训练好的模型生成文本的函数  
    def generate(model, config=MASTER_CONFIG, max_new_tokens=30):  
        idx = torch.zeros(5, 1).long()  
        for _ in range(max_new_tokens):  
            # 调用模型  
            logits = model(idx[:, -config['context_window']:])  
            last_time_step_logits = logits[  
                :, -1, :  
            ]  # 所有批次(1),最后一个时间步,所有logits  
            p = F.softmax(last_time_step_logits, dim=-1)  # 使用softmax获取概率  
            idx_next = torch.multinomial(  
                p, num_samples=1  
            )  # 从分布中采样以获取下一个token  
            idx = torch.cat([idx, idx_next], dim=-1)  # 将其附加到序列中  
        return [decode(x) for x in idx.tolist()]  

    # 使用训练好的模型生成文本  
    generate(model)

生成的文本使用我们基本的约33K参数的模型看起来并不好。然而,现在我们已经用这个简单的模型打下了基础,我们将继续构建 LLaMA 架构。

复现 LLaMA 架构

在之前的博客部分,我们介绍了基本概念,现在我们将把这些概念整合到我们的基础模型中。LLaMA 对原始的 Transformer 引入了三个架构上的改动:

  1. RMSNorm 用于预归一化
  2. Rotary 嵌入
  3. SwiGLU 激活函数

我们将逐一将这些修改加入到我们的基础模型中,迭代并在此基础上进行构建。

RMSNorm 用于预归一化:

我们定义了一个RMSNorm函数,具有以下功能:

    class RMSNorm(nn.Module):  
        def __init__(self, layer_shape, eps=1e-8, bias=False):  
            super(RMSNorm, self).__init__()  

            # 注册一个可学习的参数'scale'作为模块的参数  
            self.register_parameter("scale", nn.Parameter(torch.ones(layer_shape)))  

        def forward(self, x):  
            """  
            假设形状为 (batch, seq_len, d_model)  
            """  
            # 计算Frobenius范数,RMS = 1/sqrt(N) * Frobenius范数  
            ff_rms = torch.linalg.norm(x, dim=(1,2)) * x[0].numel() ** -.5  

            # 使用RMS对输入张量'x'进行归一化  
            raw = x / ff_rms.unsqueeze(-1).unsqueeze(-1)  

            # 使用可学习参数'scale'对归一化后的张量进行缩放  
            return self.scale[:x.shape[1], :].unsqueeze(0) * raw

我们定义了RMSNorm类。在初始化时,它注册了一个缩放参数。在前向传播过程中,它计算输入张量的Frobenius范数,然后对张量进行归一化。最后,张量通过注册的缩放参数进行缩放。此功能设计用于在LLaMA中替换LayerNorm操作。

现在是时候将 LLaMA 的第一个实现概念 RMSNorm 引入我们的简单 NN 模型了。以下是更新后的代码:

    # 定义带有 RMSNorm 的 SimpleModel_RMS  
    class SimpleModel_RMS(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  

            # 嵌入层将字符索引转换为向量  
            self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])  

            # RMSNorm 层用于预归一化  
            self.rms = RMSNorm((config['context_window'], config['d_model']))  

            # 线性层用于建模特征之间的关系  
            self.linear = nn.Sequential(  
                # 代码其余部分  
                ...  
            )  

            # 打印模型参数总数  
            print("Model parameters:", sum([m.numel() for m in self.parameters()]))  

        def forward(self, idx, targets=None):  
            # 嵌入层将字符索引转换为向量  
            x = self.embedding(idx)  

            # RMSNorm 预归一化  
            x = self.rms(x)  

            # 线性层用于建模特征之间的关系  
            logits = self.linear(x)  

            if targets is not None:  

                # 代码其余部分  
                ...

让我们执行修改后的神经网络模型,该模型使用了RMSNorm,并观察模型中更新的参数数量以及损失:

    # 创建 SimpleModel_RMS 的实例  
    model = SimpleModel_RMS(MASTER_CONFIG)  

    # 获取训练批次  
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 使用模型计算 logits 和损失  
    logits, loss = model(xs, ys)  

    # 为模型参数定义 Adam 优化器  
    optimizer = torch.optim.Adam(model.parameters())  

    # 训练模型  
    train(model, optimizer)

验证损失略有下降,我们现在更新的大型语言模型的参数总数约为55,000。

转换嵌入:

接下来,我们将实现旋转位置嵌入。在RoPE中,作者建议通过旋转嵌入来嵌入序列中每个token的位置,并在每个位置应用不同的旋转。让我们创建一个函数来模仿实际的RoPE实现:

    def get_rotary_matrix(context_window, embedding_dim):  
        # 初始化一个用于旋转矩阵的张量,初始值为零  
        R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)  

        # 遍历上下文窗口中的每个位置  
        for position in range(context_window):  
            # 遍历嵌入中的每个维度  
            for i in range(embedding_dim // 2):  
                # 根据位置和嵌入维度计算旋转角度(theta)  
                theta = 10000. ** (-2. * (i - 1) / embedding_dim)  
                # 使用正弦和余弦函数计算旋转矩阵的元素  
                m_theta = position * theta  
                R[position, 2 * i, 2 * i] = np.cos(m_theta)  
                R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)  
                R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)  
                R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)  
        return R

我们根据指定的上下文窗口和嵌入维度生成旋转矩阵,遵循提出的RoPE实现方法。

你可能熟悉transformer的架构,其中包括注意力头,当我们复制LLaMA时,我们也需要创建注意力头。首先,让我们使用我们之前为RoPE嵌入开发的get_rotary_matrix函数创建一个掩码注意力头此外,每行都有注释以增加清晰度
    class RoPEAttentionHead(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  
            # 查询的线性变换  
            self.w_q = nn.Linear(config['d_model'], config['d_model'], bias=False)  
            # 键的线性变换  
            self.w_k = nn.Linear(config['d_model'], config['d_model'], bias=False)  
            # 值的线性变换  
            self.w_v = nn.Linear(config['d_model'], config['d_model'], bias=False)  
            # 获取RoPE的位置嵌入矩阵  
            self.R = get_rotary_matrix(config['context_window'], config['d_model'])  

        def get_rotary_matrix(context_window, embedding_dim):  
            # 生成RoPE的旋转矩阵  
            R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)  
            for position in range(context_window):  
                for i in range(embedding_dim//2):  

                    # 代码的其余部分  
                    ...  

            return R  

        def forward(self, x, return_attn_weights=False):  
            # x: 形状为 (batch, sequence length, dimension) 的输入张量  

            b, m, d = x.shape  # 批量大小,序列长度,维度  

            # Q、K 和 V 的线性变换  
            q = self.w_q(x)  
            k = self.w_k(x)  
            v = self.w_v(x)  

            # 使用RoPE矩阵旋转Q和K  
            q_rotated = (torch.bmm(q.transpose(0, 1), self.R[:m])).transpose(0, 1)  
            k_rotated = (torch.bmm(k.transpose(0, 1), self.R[:m])).transpose(0, 1)  

            # 执行缩放点积注意力  
            activations = F.scaled_dot_product_attention(  
                q_rotated, k_rotated, v, dropout_p=0.1, is_causal=True  
            )  

            if return_attn_weights:  
                # 创建因果注意力掩码  
                attn_mask = torch.tril(torch.ones((m, m)), diagonal=0)  
                # 计算注意力权重并添加因果掩码  
                attn_weights = torch.bmm(q_rotated, k_rotated.transpose(1, 2)) / np.sqrt(d) + attn_mask  
                attn_weights = F.softmax(attn_weights, dim=-1)  
                return activations, attn_weights  

            return activations

现在我们已经有了一个返回注意力权重的单个掩码注意力头,下一步是创建一个多头注意力机制。

    class RoPEMaskedMultiheadAttention(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  
            # 创建一个 RoPEMaskedAttentionHead 实例的列表作为注意力头  
            self.heads = nn.ModuleList([  
                RoPEMaskedAttentionHead(config) for _ in range(config['n_heads'])  
            ])  
            self.linear = nn.Linear(config['n_heads'] * config['d_model'], config['d_model'])  # 在拼接头部后应用线性层  
            self.dropout = nn.Dropout(.1)  # Dropout 层  

        def forward(self, x):  
            # x: 形状为 (batch, sequence length, dimension) 的输入张量  

            # 处理每个注意力头并将结果拼接起来  
            heads = [h(x) for h in self.heads]  
            x = torch.cat(heads, dim=-1)  

            # 对拼接后的输出应用线性变换  
            x = self.linear(x)  

            # 应用 dropout  
            x = self.dropout(x)  
            return x

原来的论文在其较小的70亿参数的大语言模型变体中使用了32个注意力头,但由于限制,我们将使用8个注意力头来实现我们的方法。

    # 使用注意力头的数量更新主配置
    MASTER_CONFIG.update({  
        'n_heads': 8,  
    })

现在我们已经实现了Rotational Embedding和Multi-head Attention,让我们用更新的代码重写我们的RMSNorm神经网络模型。我们将测试其性能,计算损失,并检查参数数量。我们将这个更新后的模型称为“RopeModel”

     class RopeModel(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  

            # 输入令牌的嵌入层  
            self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])  

            # RMSNorm层用于预归一化  
            self.rms = RMSNorm((config['context_window'], config['d_model']))  

            # RoPEMaskedMultiheadAttention层  
            self.rope_attention = RoPEMaskedMultiheadAttention(config)  

            # 线性层后跟ReLU激活  
            self.linear = nn.Sequential(  
                nn.Linear(config['d_model'], config['d_model']),  
                nn.ReLU(),  
            )  

            # 最终的线性层用于预测  
            self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])  

            print("模型参数数量:", sum([m.numel() for m in self.parameters()]))  

        def forward(self, idx, targets=None):  
            # idx: 输入索引  
            x = self.embedding(idx)  

            # 一个注意力块  
            x = self.rms(x)  # RMS预归一化  
            x = x + self.rope_attention(x)  

            x = self.rms(x)  # RMS预归一化  
            x = x + self.linear(x)  

            logits = self.last_linear(x)  

            if targets is not None:  
                loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))  
                return logits, loss  

            else:  
                return logits

让我们执行修改后的神经网络模型,该模型使用RMSNorm、Rotational Embeddings和Masked Multi Head Attentions,以观察模型中更新的参数数量以及损失:

    # 创建一个RopeModel实例(包含RMSNorm、RoPE、Multi-Head)
    model = RopeModel(MASTER_CONFIG)  

    # 获取训练批次
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 使用模型计算logits和损失
    logits, loss = model(xs, ys)  

    # 定义模型参数的Adam优化器
    optimizer = torch.optim.Adam(model.parameters())  

    # 训练模型
    train(model, optimizer)

验证损失再次略有下降,我们现在更新的大型语言模型的参数总数约为55,000。

让我们训练模型更多轮次,看看我们重新创建的LLaMA大语言模型的损失是否继续减少。

    # 使用更多轮次和日志间隔更新训练配置  
    MASTER_CONFIG.update({  
        "epochs": 5000,  
        "log_interval": 10,  
    })  

    # 使用更新后的配置训练模型  
    train(model, optimizer)

验证损失继续下降,表明继续训练更多轮次可能会进一步减少损失,尽管幅度不大。

SwiGLU 激活函数:

如前所述,LLaMA的创建者使用了SwiGLU而不是ReLU,所以我们将在代码中实现SwiGLU方程。

https://arxiv.org/pdf/2002.05202v1.pdf

    class SwiGLU(nn.Module):  
        """ Paper Link -> https://arxiv.org/pdf/2002.05202v1.pdf """  
        def __init__(self, size):  
            super().__init__()  
            self.config = config  # 配置信息  
            self.linear_gate = nn.Linear(size, size)  # 用于门控机制的线性变换  
            self.linear = nn.Linear(size, size)  # 主分支的线性变换  
            self.beta = torch.randn(1, requires_grad=True)  # 随机初始化beta参数  

            # 使用nn.Parameter来定义beta,确保其作为可学习参数  
            self.beta = nn.Parameter(torch.ones(1))  
            self.register_parameter("beta", self.beta)  

        def forward(self, x):  
            # Swish-Gated Linear Unit的计算  
            swish_gate = self.linear_gate(x) * torch.sigmoid(self.beta * self.linear_gate(x))  
            out = swish_gate * self.linear(x)  # 门控和主分支的逐元素相乘  
            return out

在实现了SwiGLU方程后,我们需要将其集成到我们修改过的LLaMA语言模型(RopeModel)中。

    class RopeModel(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  

            # 输入token的嵌入层  
            self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])  

            # 用于预归一化的RMSNorm层  
            self.rms = RMSNorm((config['context_window'], config['d_model']))  

            # 带有RoPE(旋转位置嵌入)的多头注意力层  
            self.rope_attention = RoPEMaskedMultiheadAttention(config)  

            # 线性层后跟SwiGLU激活函数  
            self.linear = nn.Sequential(  
                nn.Linear(config['d_model'], config['d_model']),  
                SwiGLU(config['d_model']),  # 添加SwiGLU激活函数  
            )  

            # 输出线性层  
            self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])  

            # 打印模型参数总数  
            print("模型参数总数:", sum([m.numel() for m in self.parameters()]))  

        def forward(self, idx, targets=None):  
            x = self.embedding(idx)  

            # 一个注意力块  
            x = self.rms(x)  # RMS预归一化  
            x = x + self.rope_attention(x)  

            x = self.rms(x)  # RMS预归一化  
            x = x + self.linear(x)  # 应用SwiGLU激活函数  

            logits = self.last_linear(x)  

            if targets is not None:  
                # 如果提供了目标,则计算交叉熵损失  
                loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))  
                return logits, loss  

            else:  
                return logits

让我们执行修改后的神经网络模型,该模型使用RMSNorm、Rotational Embeddings、Masked Multi Head Attentions和SwiGLU,以观察模型中更新的参数数量以及损失:

    # 创建RopeModel的实例(RMSNorm、RoPE、Multi-Head、SwiGLU)
    model = RopeModel(MASTER_CONFIG)

    # 获取训练批次
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])

    # 使用模型计算logits和损失
    logits, loss = model(xs, ys)

    # 为模型参数定义Adam优化器
    optimizer = torch.optim.Adam(model.parameters())

    # 训练模型
    train(model, optimizer)

又一次验证损失出现了小幅下降,我们更新后的大型语言模型的参数总数现在约为60,000。

到目前为止,我们已经成功实现了论文中的关键组件,即RMSNorm、RoPE和SwiGLU。我们观察到这些实现导致损失略有下降。

现在我们将向我们的LLaMA添加层,以观察其对损失的影响。原论文对7b版本使用了32层,但我们只使用4层。让我们相应地调整模型设置。

    # 更新模型配置以设置层数  
    MASTER_CONFIG.update({  
        'n_layers': 4,  # 将层数设置为4  
    })

让我们从创建一个单层开始,以理解它的影响。

    # 添加 RMSNorm 和残差连接  
    class LlamaBlock(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  

            # RMSNorm 层  
            self.rms = RMSNorm((config['context_window'], config['d_model']))  

            # RoPE Masked Multihead Attention 层  
            self.attention = RoPEMaskedMultiheadAttention(config)  

            # 使用 SwiGLU 激活的前向层  
            self.feedforward = nn.Sequential(  
                nn.Linear(config['d_model'], config['d_model']),  
                SwiGLU(config['d_model']),  
            )  

        def forward(self, x):  
            # 一个注意力块  
            x = self.rms(x) # RMS 预归一化  
            x = x + self.attention(x)  # 残差连接  

            x = self.rms(x) # RMS 预归一化  
            x = x + self.feedforward(x)  # 残差连接  
            return x

创建 LlamaBlock 类的实例并将其应用于随机张量。

    # 创建 `LlamaBlock` 类的实例,使用提供的配置  
    block = LlamaBlock(MASTER_CONFIG)  

    # 生成指定批次大小、上下文窗口和模型维度的随机张量  
    random_input = torch.randn(MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'], MASTER_CONFIG['d_model'])  

    # 将 `LlamaBlock` 应用于随机输入张量  
    output = block(random_input)

成功创建单层后,我们现在可以使用它来构建多层。此外,我们将把我们的模型类从 “ropemodel” 重命名为 “Llama”,因为我们已经复制了 LLaMA 语言模型的所有组件。

    class Llama(nn.Module):  
        def __init__(self, config):  
            super().__init__()  
            self.config = config  
            # 用于 token 表示的嵌入层  
            self.embeddings = nn.Embedding(config['vocab_size'], config['d_model'])  
            # 基于指定层数的 LlamaBlock 顺序块  
            self.llama_blocks = nn.Sequential(  
                OrderedDict([(f"llama_{i}", LlamaBlock(config)) for i in range(config['n_layers'])])  
            )  
            # 用于最终输出的前馈网络 (FFN)  
            self.ffn = nn.Sequential(  
                nn.Linear(config['d_model'], config['d_model']),  
                SwiGLU(config['d_model']),  
                nn.Linear(config['d_model'], config['vocab_size']),  
            )  

            # 打印模型中的总参数数量  
            print("model params:", sum([m.numel() for m in self.parameters()]))  

        def forward(self, idx, targets=None):  
            # 输入 token 索引通过嵌入层传递  
            x = self.embeddings(idx)  
            # 通过 LlamaBlocks 处理输入  
            x = self.llama_blocks(x)  
            # 通过最终的 FFN 传递处理后的输入,以获得输出 logits  
            logits = self.ffn(x)  

            # 如果没有提供 targets,则仅返回 logits  
            if targets is None:  
                return logits  
            # 如果提供了 targets,则计算并返回交叉熵损失  
            else:  
                loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))  
                return logits, loss

让我们执行修改后的 LLaMA 模型,该模型使用 RMSNorm、Rotary Embeddings、Masked Multi-Head Attention、SwiGLU 和 N_layers,以观察模型中更新的参数数量以及损失:

    # 创建一个 RopeModel 的实例(RMSNorm、RoPE、Multi-Head、SwiGLU、N_layers)  
    llama = Llama(MASTER_CONFIG)  

    # 获取训练批次  
    xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 使用模型计算 logits 和损失  
    logits, loss = llama(xs, ys)  

    # 定义模型参数的 Adam 优化器  
    optimizer = torch.optim.Adam(llama.parameters())  

    # 训练模型  
    train(llama, optimizer)

虽然有可能过拟合,但重要的是探索增加训练轮数是否能进一步减少损失。此外,需要注意的是,我们当前的大型语言模型拥有超过2百万的参数。

让我们用更多的轮次来训练它。

    # 更新配置中的轮次数量  
    MASTER_CONFIG.update({  
        'epochs': 10000,  
    })  
    # 使用指定的轮次数量训练 LLaMA 模型  
    train(llama, optimizer, scheduler=None, config=MASTER_CONFIG)

这里的损失是1.08,我们可以在不遇到显著过拟合的情况下实现更低的损失。这表明模型表现得很好。

让我们再次训练模型,这次加入调度器

    # 再次训练模型,使用调度器进行更好的优化。
    train(llama, optimizer, config=MASTER_CONFIG)

到目前为止,我们已经成功地在自定义数据集上实现了 LLaMA 架构的简化版本。现在,让我们来检查我们这个拥有200万个参数的语言模型生成的输出。

    # 使用训练好的 LLM (llama) 生成最多 500 个 token 的文本  
    generated_text = generate(llama, MASTER_CONFIG, 500)[0]  
    print(generated_text)

即使生成的一些单词可能不是完美的英语,我们仅拥有200万个参数的大型语言模型也展现出了对英语的基本理解。

现在,让我们看看我们的模型在测试集上的表现如何。

    # 从测试集中获取批次  
    xs, ys = get_batches(dataset, 'test', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])  

    # 将测试数据传递给 LLaMA 模型  
    logits, loss = llama(xs, ys)  

    # 打印测试集上的损失  
    print(loss)

测试集上的计算损失约为1.236。

一个简单的检查生成输出变化的方法是运行大量轮次的训练并观察结果。

实验中的超参数

超参数调整是训练神经网络的关键步骤。在原版的Llama论文中,作者使用了余弦退火学习率调度。然而,在我们的实验中,这种方法表现不佳。这里是一个使用不同学习率调度进行超参数实验的例子:

    # 更新配置  
    MASTER_CONFIG.update({  
        "epochs": 1000  
    })  

    # 使用余弦退火学习率调度创建Llama模型  
    llama_with_cosine = Llama(MASTER_CONFIG)  

    # 定义带有特定超参数的Adam优化器  
    llama_optimizer = torch.optim.Adam(  
        llama.parameters(),  
        betas=(.9, .95),  
        weight_decay=.1,  
        eps=1e-9,  
        lr=1e-3  
    )  

    # 定义余弦退火学习率调度器  
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(llama_optimizer, 300, eta_min=1e-5)  

    # 使用指定的优化器和调度器训练Llama模型  
    train(llama_with_cosine, llama_optimizer, scheduler=scheduler)
保存您的语言模型(LLM)

你可以保存整个LLM模型或仅保存参数,使用以下代码:

    # 保存整个模型  
    torch.save(llama, 'llama_model.pth')  

    # 如果你只想保存模型参数  
    torch.save(llama.state_dict(), 'llama_model_params.pth')

为了将你的 PyTorch 模型保存到 Hugging Face 的 Transformers 库中,你可以使用 save_pretrained 方法。以下是一个示例:

    from transformers import GPT2LMHeadModel, GPT2Config  

    # 假设 Llama 是你的 PyTorch 模型  
    llama_config = GPT2Config.from_dict(MASTER_CONFIG)  
    llama_transformers = GPT2LMHeadModel(config=llama_config)  
    llama_transformers.load_state_dict(llama.state_dict())  

    # 指定你想要保存模型的目录  
    output_dir = "llama_model_transformers"  

    # 保存模型和配置  
    llama_transformers.save_pretrained(output_dir)

GPT2Config 用于创建一个与 GPT-2 兼容的配置对象。然后,创建并加载了 GPT2LMHeadModel,使用了你 Llama 模型的权重。最后,调用了 save_pretrained 将模型和配置保存到指定目录中。

你可以使用Transformers库加载模型:

    from transformers import GPT2LMHeadModel, GPT2Config  

    # 指定保存模型的目录  
    output_dir = "llama_model_transformers"  

    # 加载模型和配置  
    llama_transformers = GPT2LMHeadModel.from_pretrained(output_dir)
结论

在这篇博客中,我们详细介绍了如何实现 LLaMA 方法来构建你自己的小型语言模型(LLM)的步骤。建议将你的模型扩展到大约 1500 万个参数,因为在这个范围内的较小模型(10M 到 20M)通常更擅长理解英语。一旦你的 LLM 在语言方面变得熟练,你可以对其进行微调以适应特定的应用场景。

我希望这篇全面的博客能为你提供一些见解,帮助你复制论文来创建你个性化的大型语言模型。

感谢阅读这篇详尽的文章!

0人推荐
随时随地看视频
慕课网APP