手记

TiKV 源码解析系列文章(十八)Raft Propose 的 Commit 和 Apply 情景分析

在学习了[[前的文章](https://pingcap.com/blog-cn/tikv-source-code-reading-2/)之后,相信大家已经对TiKV使用的Raft核心库** raft-rs * 有基本的了解。 * raft-rs **实现了Raft Leader选举和日志复制等核心功能,而消息的发送,接收,应用到状态机等操作则需要使用者自行实现,此处将要介绍的就是TiKV中这些部分的处理过程。

筏准备好了

在开始正题之前,我们先简单回顾一下**筏-RS **与外部代码的交互接口。立即立即结构的[定义](https://github.com/tikv/raft-rs/blob/afabefa23196a4a23832add0087f5a522d8ccc3b / src / raw_node.rs#L86-L110)如下:

锈酒吧结构适当{///如果没有更新,SoftState将为零。///不需要使用或存储SoftState。ss:Option <SoftState>,///在发送消息hs:Option <HardState>,/// ///如果没有更新,则HardState将等于空状态。////当其应用的索引大于ReadState中的索引时。///请注意,当筏收到MsgReadIndex时,将返回read_state。///返回读取状态:Vec <ReadState>,////指定要在发送消息之前////保存到稳定存储的快照。Vec<必须>,///快照指定要保存到稳定存储的快照。快照:// /以前。酒馆commit_entries:选项<VEC <进入>>,///要AFTER的必须是发送的消息指定出站邮件/////进行稳定的存储。///如果包含MsgSnap消息,则当收到消息或通过ReportSnapshot故障快照时,应用程序必须向筏报告///。。消息>,must_sync:bool,} `

Ready结构包括了一些系列Raft状态的更新,在这里中我们需要关注的是:

  • ** HS :筏相关的元信息更新,如当前的任期,投票结果,致力于指数等等* committed_entries :最新被提交的日志,需要应用到状态机中 消息:需要发送给其他对等体的日志 条目:需要保存的日志。

##开始的接收和在筏中的复制

TiKV 3.0中个月了类似[Actor](https://en.wikipedia.org/wiki/Actor_model )的并发模型,Actor被视为并发运算的基本单元:当一个Actor接收到一则消息,它可以做出一些决策,创建更多的演员,发送更多的消息,决定要如何回答接下来的消息。每个TiKV上的筏同行都对应两个演员,我们把它们分别称为PeerFsmApplyFsmPeerFsm用于接收和处理其他筏同伴发送过来的筏消息,而ApplyFsm用于将已提交日志应用到状态机。

TiKV中实现的Actor系统被称为BatchSystem,它使用几个Poll线程从多个邮箱上拉取一个Batch的消息,再分别交由各个Actor来执行。为了保证[ 线性一致性 ](https:// pingcap.com/blog-cn/linearizability-and-raft/),一个演员同时只会在一个投票线程上接收消息并顺序执行。由于篇幅所限,这一部分的实现在这里不做任何事情,有所作为的同学可以在raftstore / FSM / batch.rs查看详细代码。

上面谈到,PeerFsm用于接收和处理筏消息它接收的消息为。PeerMsg,根据消息类型的不同会有不同的处理:

/// Message that can be sent to a peer.
pub enum PeerMsg {
    /// Raft message is the message sent between raft nodes in the same
    /// raft group. Messages need to be redirected to raftstore if target
    /// peer doesn't exist.
    RaftMessage(RaftMessage),
    /// Raft command is the command that is expected to be proposed by the
    /// leader of the target raft group. If it's failed to be sent, callback
    /// usually needs to be called before dropping in case of resource leak.
    RaftCommand(RaftCommand),
    /// Result of applying committed entries. The message can't be lost.
    ApplyRes { res: ApplyTaskRes },
    ...
}

...

impl PeerFsmDelegate {
    pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg>) {
        for m in msgs.drain(..) {
            match m {
                PeerMsg::RaftMessage(msg) => {
                    self.on_raft_message(msg);
                }
                PeerMsg::RaftCommand(cmd) => {
                    self.propose_raft_command(cmd.request, cmd.callback)
                }
                PeerMsg::ApplyRes { res } => {
                    self.on_apply_res(res);
                }
                ...
            }
        }
    }
}

这里只列出了我们需要关注的几种消息类型:

  • RaftMessage:其他对等发送过来Raft消息,包括心跳,日志,投票消息等。

  • RaftCommand:上层提出的提案,其中包含了需要通过Raft同步的操作,以及操作成功之后需要调用的callback函数。

  • ApplyRes:ApplyFsm在将日志应用到状态机之后发送给PeerFsm的消息,用于在进行操作之后更新某些内存状态。

我们主要关注的是PeerFsm如何处理的提案,也就是RaftCommand的处理过程。在进入到PeerFsmDelegate :: propose_raft_command后,首先会调用PeerFsmDelegate :: pre_propose_raft_command对对等体ID,对等术语,区域划时代(区域的版本,区域拆分,合并和添加/删除对等体等操作会改变区域的时代)是否匹配,对等体或领导者等条件进行这种检查,并根据请求的类型(是读请求还是写请求),选择不同的提出策略见(同行::检查):

锈让政策= 自我。检查(& req ); 让 RES = 匹配政策{ 好( RequestPolicy :: ReadIndex )= > 回归自我。read_index ( CTX , REQ , err_resp , CB ),确定( RequestPolicy :: ProposeNormal )= > 自我。

  proposal_normal ( ctx , req ),... } ;```

对于读请求,我们只需要确认此时Leader是否真的是Leader即可,一个多个轻量的方法是发送一次心跳,再检查是否收到了过半的响应,这在raft-rs中被称为ReadIndex(关于ReadIndex的介绍可以参考[ 这篇文章 ](https://pingcap.com/blog-cn/lease-read/) )。对于写请求,则需要提出一条筏日志,这是在propose_normal函数中调用筏::建议接口完成的。在提出了一条日志之后,同伴会将建议保存在一个名为apply_proposalsVEC中。随后一个批次(包含了多个对等)内的提案会被投票线程统一收集起来,放入一个名为pending_proposalsVEC中待后续处理。

在一个批次的消息都经PeerDelegate :: handle_msgs处理完毕之后,民意调查对批次内的每一个同伴调用同行:: handle_raft_ready_append

  1. 用记录的last_applied_index获取一个准备。2. 在得到一个准备好之后,PeerFsm就会像我们前面所描述的那样,调用PeerStorage :: handle_raft_ready更新状态(术语,最后日志索引等)和日志。3. 这里的状态更新分为持久化状态和内存状态,持久化状态的更新被写入到一个WriteBatch中,内存状态的更新则会构造一个InvokeContext,这些更新都会被一个PollContext暂存起来。

于是我们得到了Batch内所有Peer的状态更新,以及最近提出的提案,随后Poll线程会做以下几件事情:

  1. 将建议发送给ApplyFsm暂存,以便在建议写入成功之后调用回调返回响应。2. 将之前从各个就绪中得到的需要发送的日志发送给GRPC线程,随后发送给其他TiKV节点。3. 持久化已保存在WriteBatch中需要更新的状态。4. 根据InvokeContext更新PeerFsm中的内存状态。5. 将已提交日志发送给ApplyFsm进行应用(见同伴:: handle_raft_ready_apply)。

提案在Raft中的确认

上面我们阐述了地区的负责人在收到建议之后,是调用了哪些接口将提案放到筏状态机中的。在这之后,这个建议虽然被发往了ApplyFsm中暂存,但是ApplyFsm目前还不能适用于它并调用关联的’回调’函数,因为这个建议还没被筏中的过半节点确认。那么,领导节点上的raftstore模块是如何处理收到的其他副本的筏消息,并完成日志的确认的呢?

答案就在PeerFsmDelegate :: on_raft_message函数中。在一个对等收到筏消息之后,会进入这个函数中进行处理,内部调用筏::一步函数更新筏状态机的内存状态。之后,调用RawNode ::准备函数获取committed_entries,最终作为ApplyMsg ::应用任务发送给ApplyFsm,由ApplyFsm执行指令,如果提案是由本节点发出,还会调用’回调’函数(之前通过ApplyMsg ::建议任务暂存在ApplyFsm中),以向客户端返回响应。

提案的应用

在上一部分我们提到,PeerFsm会将提案以及已提交日志发送给对应的ApplyFsm,它们对应的消息类型分别是ApplyMsg ::建议ApplyMsg ::应用',下面将会讲述 ApplyFsm `是如何处理这两种类型的消息的。

对于ApplyMsg ::提案的处理非常简单(见ApplyFsm :: handle_proposal),ApplyFsm会把衔接加入ApplyDelegate :: pending_cmds中暂存起来,后续在应用对应的日志时会从这里进行相应的回调进行调用。

ApplyMsg:应用中包含的是实际需要应用的日志,ApplyFsm会针对这些日志进行(见ApplyFsm :: handle_apply):

  1. 修改内存状态,将更改的状态(施加的最后索引等),数据持久化。2。引发初步对应的预期回复响应。3。PeerFsm发送ApplyRes,其中包含了applied_termapplied_index '等状态(用于更新PeerFsm`中的内存状态)。

在** raft-rs **的实现中,当选举出新的Leader时,新Leader会广播一条“空日志”,以提交前面term中的这时,可能还有一些在前面术语中提出的预期仍处于待决阶段,而因为有新的领导者产生,这些始终永远不会被确认了,因此我们需要对它们进行清理,以免关联的'替换'无法调用导致一些资源无法释放。清理的逻辑参照ApplyFsm :: handle_entries_normal函数。

总结

这里用一个流程图总结一下TiKV处理逐步的大致流程,如下:

简言之,TiKV使用了两个线程池来处理的建议,并且将一个堆栈同行分开了两部分:PeerFsmApplyFsm。在处理建议书的过程中,首先由PeerFsm获取日志并驱动浮动内部的状态机,由ApplyFsm根据已提交日志修改对应数据的状态机(区域信息和用户数据)。

由于这部分代码涉及到各种极端情况,因此逻辑上比较复杂,希望读者的读者可以进一步从二进制中获取更多细节。

![](

0人推荐
随时随地看视频
慕课网APP