一、背景
书接上文6.824 raft Lab 2C 持久化与恢复,本文继续往下讲解日志压缩。
raft通过日志来实现多副本的数据一致,但是日志会不断膨胀,带来两个缺点:数据量大、恢复时间长,因此需要定期压缩一下,生成snapshot。本文实现的源码:6.824 raft Lab 2D 日志压缩
1 何时压缩?
触发压缩的时机一般是以下两种:
- 日志的数据量达到阈值(推荐)。
- 日志的数量达到阈值。
测试用例是根据日志数量来触发压缩的。
2 谁触发压缩?
snapshot是状态机某一时刻的副本,具体格式依赖存储引擎的实现,比如说:B+树、LSM、哈希表等,6.824是实现一个键值数据库,所以我们采用的是哈希表,在Lab 3可以看到实现。
3 snapshot是否要一致?
目前有两种方案:
- Follower的snapshot需要和Leader保持一致,优点是简单,完全向Leader看齐,但是会传输更多的数据,受Leader网络带宽限制。
- Follower独立压缩日志,各个peer的checkpoint可能不一样,但Follower数据传输量小,除非很落后才有可能需要拉取Leader的snapshot。
本文选择第二种,Follower只对已经达成一致、已经提交的的日志进行压缩,并不会影响数据一致性,只是达成一致的数据在不同peer上的呈现形式有所不同,由snapshot+log组成,比例可能不一样。
二、设计思想
1 接口
日志压缩的实现涉及到三个接口:
- Snapshot:raft提供给应用层调用的接口,用于生成snapshot,并截断自己的log,每个peer都可以调用。
- InstallSnapshot:当Follower太过落后而Leader已经将这个Follower需要的日志压缩了,此时就需要传输snapshot,本接口接收Leader的snapshot。
- CondInstallSnapshot:Follower接收到snapshot后不能够立刻应用并截断日志,raft和状态机都需要应用snapshot,这需要考虑原子性。如果raft应用成功但状态机应用snapshot失败,那么在接下来的时间里客户端读到的数据是不完整的。如果状态机应用snapshot成功但raft应用失败,那么raft会要求重传,状态机应用成功也没啥意义。因此CondInstallSnapshot是异步于raft的,并由应用层调用。
注:raft是被应用层调用的,不能反过来调用应用层,raft可以单独作为一个库支持各种应用层。
2 交互流程
任意peer的应用层都可以独立压缩已经提交的日志,这个操作不涉及到其他peer,需要持久化。
如果某个peer的日志太落后,缺少的日志已经被Leader压缩了,此时Leader就需要通过InstallSnapshot接口将snapshot直接传输给这个peer,而这个peer会将snapshot通过applyCh上传给自己的应用层,此时不能截断日志,否则万一应用snapshot失败就出事了。最后由应用层先给状态机先snapshot,然后调用CondInstallSnapshot来给raft应用。
三、代码实现
1 接口定义
<code style="margin-left:0">type Raft struct { ... //提交情况 //log *RaftLog logs []*LogEntry nextIndex []int matchIndex []int commitIndex int lastApplied int lastIncludeIndex int //快照的最大logIndex lastIncludeTerm int //最后一条压缩日志的term,不是压缩时peer's term snapshotOffset int //快照可能分批次传输 snapshot []byte //状态机 applyCond *sync.Cond applyChan chan ApplyMsg } type InstallSnapshotArgs struct { Term int //leader's term LeaderId int LastIncludeIndex int //snapshot中最后一条日志的index LastIncludeTerm int Data []byte //快照 //Offset int //此次传输chunk在快照文件的偏移量,快照文件可能很大,因此需要分chunk,此次不分片 //Done bool //是否最后一块 } type InstallSnapshotReply struct { Term int }</code>
分片有点麻烦,下次优化?。
使用snapshot后,logs的下标就有问题了,需要修改,幸好一开始封装了方法,切勿直接使用rf.logsi来操作。另外,为了减少判断,rf.logs0是一个dummy log,起到哨兵的作用,并且rf.logs0.LogTerm是snapshot的最后一条日志的LogTerm,千万别设置成压缩时的peer's term,否则日志匹配时会出问题。
<code style="margin-left:0">func (rf *Raft) lastLogTermAndLastLogIndex() (int, int) { logIndex := rf.lastLogIndex() logTerm := rf.logs[logIndex-rf.lastIncludeIndex].LogTerm return logTerm, logIndex } func (rf *Raft) lastLogIndex() int { return len(rf.logs) - 1 + rf.lastIncludeIndex } func (rf *Raft) logTerm(logIndex int) int { return rf.logs[logIndex].LogTerm } func (rf *Raft) logEntry(logIndex int) *LogEntry { //越界 if logIndex > rf.lastLogIndex() { return rf.logs[0] } //该日志已被压缩 logIndex = logIndex - rf.lastIncludeIndex if logIndex <= 0 { return rf.logs[0] } return rf.logs[logIndex] }</code>
2 InstallSnapshot
<code style="margin-left:0">//接收来自leader的快照,并上传给应用层,通过applyCh写入 //这个函数只是follower为了赶上leader状态的, func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { rf.mu.Lock() defer rf.mu.Unlock() defer func() { DPrintf("process InstallSnapshot node[%v] term[%v] lastLogIndex[%d] lastLogTerm[%d] lastIncludeIndex[%d] commitIndex[%d] received InstallSnapshot, args: %v, reply: %v", rf.me, rf.term, rf.lastLogIndex(), rf.logTerm(rf.lastLogIndex()-rf.lastIncludeIndex), rf.lastIncludeIndex, rf.commitIndex, mr.Any2String(args), mr.Any2String(reply)) }() reply.Term = rf.term if rf.term > args.Term || args.Data == nil { DPrintf("InstallSnapshot node[%d] term[%d] from node[%d] term[%d], rf.term > args.Term delined, args: %v, reply: %v ", rf.me, rf.term, args.LeaderId, args.Term, mr.Any2String(args), mr.Any2String(reply)) return } if rf.term < args.Term { rf.role = Follower rf.term = args.Term rf.votedFor = RoleNone rf.leaderId = RoleNone rf.persist() } rf.lastActiveTime = time.Now() //只有缺少的数据在快照点之前时才需要快照 if rf.commitIndex >= args.LastIncludeIndex { DPrintf("InstallSnapshot node[%d] term[%d] from node[%d] term[%d], commitIndex[%d], rf.commitIndex >= args.LastIncludeIndex delined, args: %v, reply: %v ", rf.me, rf.term, args.LeaderId, args.Term, rf.commitIndex, mr.Any2String(args), mr.Any2String(reply)) return } //接收快照并持久化,至于应用到状态机可以异步做,就算意外下线,也是从日志和快照恢复 // //不能立刻应用快照,需要保证raft和状态机都应用快照成功,放到CondInstallSnapshot中应用 applyMsg := ApplyMsg{ SnapshotValid: true, Snapshot: args.Data, SnapshotTerm: args.LastIncludeTerm, SnapshotIndex: args.LastIncludeIndex, } //异步做,及早返回,就算失败raft也会回退,凡是和应用层交互并有可能阻塞的地方都异步做。 go func() { rf.applyChan <- applyMsg }() }</code>
3 Snapshot
<code style="margin-left:0">//snapshot是应用层状态机的序列化,index表示checkpoint //peer独立压缩日志 func (rf *Raft) Snapshot(index int, snapshot []byte) { rf.mu.Lock() defer rf.mu.Unlock() //lastApplied是已经应用到状态机的最后一条日志,也是压缩点。 if index <= rf.lastIncludeIndex || index != rf.lastApplied || index > rf.lastLogIndex() { return } DPrintf("node[%d] role[%d] term[%d] snapshoting, index[%d] commitIndex[%d] lastApplied[%d]", rf.me, rf.role, rf.term, index, rf.commitIndex, rf.lastApplied) logs := rf.logs[0:1] //dummy log的LogTerm一定要是压缩的最后一条日志的Term logs[0].LogTerm = rf.logs[index-rf.lastIncludeIndex].LogTerm //本结点最后一条日志在快照点之前,太落后,清空,应用快照,否则截断 logs = append(logs, rf.logs[index-rf.lastIncludeIndex+1:]...) rf.logs = logs rf.snapshot = snapshot rf.lastIncludeIndex = index rf.lastIncludeTerm = logs[0].LogTerm rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), snapshot) }</code>
独立压缩时checkpoint是lastApplied。从Leader同步snapshot时,snapshot的LastIncludedIndex是commit Index。
4 CondInstallSnapshot
<code style="margin-left:0">//应用层调用,询问raft是否需要安装这个snapshot,在InstallSnapshot时并不会安装 func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool { rf.mu.Lock() defer rf.mu.Unlock() //异步应用快照,如果此时commitIndex已经追上来了,就不需要再应用快照了 if rf.commitIndex > lastIncludedIndex { return false } logs := rf.logs[0:1] logs[0].LogTerm = lastIncludedTerm //本结点最后一条日志在快照点之前,太落后,清空,应用快照,否则截断 if rf.lastLogIndex() > lastIncludedIndex { logs = append(logs, rf.logs[lastIncludedIndex-rf.lastIncludeIndex+1:]...) } rf.logs = logs rf.snapshot = snapshot rf.lastIncludeIndex = lastIncludedIndex rf.lastIncludeTerm = lastIncludedTerm //给raft安装snapshot,需要更新新的rf.lastApplied、rf.commitIndex rf.lastApplied = lastIncludedIndex rf.commitIndex = lastIncludedIndex rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), snapshot) return true }</code>
四、测试和总结
1 测试
2 小结
本文简要介绍了日志压缩的实现,减少了数据量和恢复时间。日志压缩由应用层根据日志数量触发,生成snapshot并截断日志,每个peer都可以独立进行。如果某个peer的日志太过于落后就需要复制Leader的snapshot,而peer接收到snapshot后不能立刻应用和截断日志,应该由状态机先应用,raft后应用snapshot。倘若先应用到raft成功了,后应用到状态机失败了,就会导致状态机数据不完整,最好是两者保持原子性。
未经允许不得转载:木盒主机 » 6.824 raft Lab 2D 日志压缩