分散合意アルゴリズム Raft を TLA+ で検証する

はじめに

この記事では分散合意アルゴリズム Raft を 形式仕様記述 + モデル検査ツールの TLA+ を用いて実際に検証を行う手順を示します。Raft の作者が公開している TLA+ のコードは Raft のアルゴリズムのみが実装されていて検証の部分のコードがないためこの記事で補います。Raft や TLA+ に関する詳細は解説しませんが、詳細を知るためのリンクを集めたので、詳しく知りたい方はそちらを参照してください。

分散合意アルゴリズム Raft とは

分散合意アルゴリズムとは

分散合意アルゴリズムの合意とはアルゴリズムに参加する複数のサーバーの有する状態が共通の状態に収束することです。分散合意アルゴリズムは参加するサーバーに障害が発生しても一貫した状態を保持します。
この性質は多くの高可用性と高信頼性を必要とする分散アルゴリズムが必要とする性質で、分散合意アルゴリズムは多くの分散アルゴリズムが必要とする共通の機能を抽象化し取り出したアルゴリズムと考えられます。
今後何かしらの分散処理システムを実装する場合は、分散合意アルゴリズムの上に必要とする特有の機能を追加する形で実装できないか検討するといいでしょう。

Raft の特徴

Raft の大雑把な動作イメージを理解するには以下のサイトが役に立ちます。

Raft は次の特徴を持つアルゴリズムとして Diego Ongaro によって考案されました

  • アルゴリズムの理解のしやすさに重点が置かれている
  • サーバー間の協調動作に使用するのは RequestVote RPC と AppendEntries RPC の 2 つの RPC のみ
  • 実装可能な程度にアルゴリズムの詳細が説明されている
  • LogCabin という実装が公開されている
  • TLA+ による検証が行われている

この記事で注目するのはTLA+ による検証が行われている点です。
Diego Ongaro による TLA+ のソースコードは GitHub で公開されているのですが、アルゴリズムのコアの部分についての実装だけで検証に関するコードが抜け落ちています。彼自身は検証に必要なコードも書いて検証したと思いますが、その部分は公開されていないのでこの記事で補います。

Raft についてはすでに日本語で書かれた記事や資料がいくつかあるのでリンクを以下にリストします。

論文の 3 章の内容をほぼすべて日本語翻訳した内容です。英語の論文を読む前にざっと目を通しておくと理解が深まると思います。

  • PFI の勉強会のスライド資料

www.slideshare.net
アルゴリズムのコアの部分だけでなく実システムに必要なメンバーシップ管理や、ログのコンパクション、Client との Interaction の内容まで解説されています。

  • 分散合意アルゴリズム Raft を理解する

qiita.com
Raft は Crash-Recovery 障害に対して耐性を持つことに言及しています。

Raft に関するクイズを記事の最後に載せていますので、理解度の確認に使用してください。

Raft が満たす性質

Raft は以下の 5 つの性質が常に満たされることを保証します。
Raft に参加するサーバーの再起動や、サーバー間メッセージの重複やドロップなどが任意のタイミングで任意の回数発生する状況下であっても、これらの性質が満たされ続けることを TLA+ を用いて検証します。

Election Safety

1 つの term には多くとも 1 つの Leader が選出される。

正確を期すために補足すると、1 つの term に Leader が存在しない場合もあります。また、同時に複数の Leader が存在する場合もあります。複数の Leader が存在する場合は、Leader の term が異なっている必要があります。古い term を持つ Leader が新しい Leader からのメッセージを受け取ると Follower に戻ります。

Leader Append-Only

Leader は自分自身の log に対して追記のみをして、書き換えや削除をしない。

Leader は自身が作成した entry については log に追記しかしません。よって、Leader が Leader になった時点で保持していた log が不変であることを検証します。
Leader である期間を通して、Leader になった時点で保持していた log が不変であることと、Leader が作成した entry は追記のみという 2 つの事実を合わせて Leader Append-Only が成り立つことを検証できます。

Log Matching

2 つの log が同じ index に同じ term の entry を持つなら、その index までの entry は全て一致する。

Raft アルゴリズムの中核の AppendEntries RPC は Leader の log を Follower にコピーする際、 Follower の log とどの index まで一致しているか判断する際にこの性質に大きく依存しています。つまり、この性質が成り立つことは Raft にとって非常に重要です。
具体的には Leader は log の末尾から先頭方向に向かって、Follower の log の entry の term を順に比較します。ある index で、entry の term が一致したら、先頭からその index までの entry は全て一致しているとみなして(Log Matching が成り立つとみなして)entry も転送しないし、本当に一致しているかのチェックもしません。
Log Matching を検証することは難しくありません。一方で、なぜこの性質が成り立つのか?は説明するのがとても難しいです。この性質が成り立つことを検証するのではなく証明をして理解しようと私が奮闘した記録が次の記事にあるので興味のある人は参考にしてください。
www.orecoli.com

Leader Completeness

もしある entry がある term に commit されたなら、その term 以降の全ての Leader はその entry を log の中に含む。

Raft アルゴリズムの生涯を通して、一度 commit された entry は Leader が変わっても存続します。Raft が合意するのは、Leader の log の先頭から commit index までの内容です。

State Machine Safety

ある server の log の特定の index の entry を自身の state machine に適用するとき、他の全ての server の log のその index には同じ entry が存在する。

各 server は Leader から AppendEntires RPC により、自身の log のどこまでが commit されているか通知されます。commit された entry は Leader Completeness より Leader が変わっても log の同じ index に存在し、各 server の log は Leader の log と一致するように AppendEntries RPC により複製されます。
つまり、各 server の commit index までの log はリーダーの log と一致するので、各 server が自身の state machine に適用する entry は必ずこの commit index 以下であるという制約を満たす限り全ての server は同じ entry を同じ順序で自身の state machine に適用します。
今回検証するコードでは state machine に entry を適用する部分は記述されていないので上の制約を満たすのは実装者の責任です。

TLA+ とは

TLA+ は分散合意アルゴリズムの Paxos や LaTeX の開発者として知られる
Leslie Lamport によって開発された形式仕様記述言語 + モデル検査ツールです。数学的な記述を用いてシステムやプログラムの仕様を厳密に記述することにより仕様上の間違いや修正が困難なバグの発見を補助します。特に分散システムの検証に威力を発揮します

以下の記事では TLA+ のような形式手法が必要とされる動機について非常によく説明されています。
orgachem.hatenablog.com

TLA+ は Amazon が自社のクラウドで提供されているサービスの検証に用いたことで話題になりました。
masateruk.hatenablog.com
sakaia.hatenadiary.org

国内企業でも TLA+ について調査したり、実際に使用している例が出てきています。
swet.dena.com
dev.classmethod.jp
tkh86.hateblo.jp
FINAL FANTASY XV POCKET EDITION を支える AWS サーバレス技術

TLA+ による Raft の形式的仕様

以下のコードが Raft の検証コードです。Diego Ongaro によるオリジナルのコードのコメントを日本語に翻訳しています。また検証に必要な変数やコードを追加しています。
コードの最後に上で挙げた Raft が満たす性質を新たに追加した変数等を使用して論理式として表現しています。それぞれの性質の説明と論理式の対応を確認してください。

-------------------------------- MODULE main --------------------------------
EXTENDS Naturals, FiniteSets, Sequences, TLC

\* Server ID の集合
CONSTANTS Server

\* Client のリクエストに含まれる値の集合
CONSTANTS Value

\* Server の取りうる状態
CONSTANTS Follower, Candidate, Leader

\* 予約された定数
CONSTANTS Nil

\* メッセージタイプ:
CONSTANTS RequestVoteRequest, RequestVoteResponse,
          AppendEntriesRequest, AppendEntriesResponse

----
\* 大域変数

\* Server 間のリクエストとリスポンスを記録する重複を許す集合(Bag)
\* TLA+ は重複を許す集合をサポートしないので、Message から Nat への集合で代替する
VARIABLE messages

\* 証明で使用する記録用の変数で実装には現れない
\* leader の最初の log と投票者の log を含む、成功した選挙を記録する
\* 成功した選挙に関する様々な情報を含む関数の集合(BecomeLeader を参照)
\* 本記事の検証では使用しない
VARIABLE elections

\* 証明で使用する記録用の変数で実装には現れない
\* system 内のすべての log を記録する
\* 本記事の検証では使用しないためコメントアウト
\* VARIABLE allLogs

----
\* 以下の変数は Server ごとに個別の変数 (ドメインが Server の関数)

\* 各 Server の term
VARIABLE currentTerm

\* 各 Server の state で次のいずれか (Follower, Candidate, or Leader)
VARIABLE state

\* 各 Server が現在の term で投票した Candidate か
\* もしくは誰にも投票していない場合は Nil
VARIABLE votedFor
serverVars == <<currentTerm, state, votedFor>>

\* log の entry の Sequence。
VARIABLE log

\* state machine が適用する可能性のある log の最新 entry の index
VARIABLE commitIndex
logVars == <<log, commitIndex>>

\* 以下の変数は Candidate の場合にのみ使用される:
\* 現在の term で RequestVote に対する返信を受け取った Server の集合
VARIABLE votesResponded

\* 現在の term で Candidate に対する投票を受け取った Server の集合
VARIABLE votesGranted

\* 証明で使用する記録用の変数で実装には現れない
\* Candidate に対して現在の term で投票した Server の log を記録する
\* 本記事の検証では使用しない
VARIABLE voterLog
candidateVars == <<votesResponded, votesGranted, voterLog>>

\* 以下の変数は Leader の場合にのみ使用される:
\* それぞれの Follower に次に送る entry の index を記録する
VARIABLE nextIndex

\* それぞれの Follower の log と Leader の log が一致することが確認されている index を記録する
\* この変数は Leader が commitIndex を計算するのに使用する
VARIABLE matchIndex

\* 証明で使用する記録用の変数で実装には現れない
\* リーダーに選出された時点での log を保持する
\* LeaderAppendOnly の検証に使用する
VARIABLE leaderStartLog

\* 証明で使用する記録用の変数で実装には現れない
\* リーダーが commit したエントリーのリスト
\* Leader Completeness の検証に使用する
VARIABLE commitedEntries
leaderVars == <<nextIndex, matchIndex, elections, leaderStartLog, commitedEntries>>

\* Server ごとの変数は以上
----

\* 全ての変数。stuttering のために使用
vars == <<messages, serverVars, candidateVars, leaderVars, logVars>>
----
\* Helpers

\* 定足数以上の Server が含まれる集合の集合
\* この集合の要素は過半数以上の Server を含むが、唯一重要な性質は、
\* この集合の要素は他のどの要素との間にも共通の Server を含むことである
Quorum == {i \in SUBSET(Server) : Cardinality(i) * 2 > Cardinality(Server)}

\* log の最後の entry の term。log が空の場合は 0 を返す
LastTerm(xlog) == IF Len(xlog) = 0 THEN 0 ELSE xlog[Len(xlog)].term

\* Send と Reply オペレーターのための Helper
\* message m と message の Bag を渡すと m を追加した新しい Bag を返す
WithMessage(m, msgs) ==
    IF m \in DOMAIN msgs THEN
        [msgs EXCEPT ![m] = msgs[m] + 1]
    ELSE
        msgs @@ (m :> 1)

\* Discard と Reply オペレーターのための Helper
\* message m と message の Bag を渡すと m を削除した新しい Bag を返す
\* msgs[m] が 0 になった後に WithMessages で再度 msgs[m] = 1 となるのを防ぐため、
\* オリジナルの実装から修正済み
WithoutMessage(m, msgs) ==
    IF m \in DOMAIN msgs THEN
      IF msgs[m] = 1 THEN
        [d \in DOMAIN msgs \ {m} |-> msgs[d]]
      ELSE
        [msgs EXCEPT ![m] = msgs[m] - 1]
    ELSE
      msgs

\* message m を messages に追加する
Send(m) == messages' = WithMessage(m, messages)

\* message m を messages から削除する
Discard(m) == messages' = WithoutMessage(m, messages)

\* Send と Discard の組み合わせ
Reply(response, request) ==
    messages' = WithoutMessage(request, WithMessage(response, messages))

\* 集合から最小の値を返す。集合が空集合の場合は undefined を返す
Min(s) == CHOOSE x \in s : \A y \in s : x <= y

\* 集合から最大の値を返す。集合が空集合の場合は undefined を返す
Max(s) == CHOOSE x \in s : \A y \in s : x >= y

\* 2 つの log の term が一致する最大の index を返す
MaxMatchTermIndex(log1, log2) == 
  LET indexes == {i \in 1..Min({Len(log1), Len(log2)}) : log1[i].term = log2[i].term}
  IN IF indexes = {} THEN 0 ELSE Max(indexes)

----
\* 全ての変数の初期値を定義する

\* Leader は自分自身に message を送信しないため、nextIndex[i][i] と matchIndex[i][i] の値は
\* 参照されることはないが、定義が簡単になるので含んだ状態で定義する
InitHistoryVars   == /\ elections       = {}
                     /\ voterLog        = [i \in Server |-> [j \in {} |-> <<>>]]
InitServerVars    == /\ currentTerm     = [i \in Server |-> 1]
                     /\ state           = [i \in Server |-> Follower]
                     /\ votedFor        = [i \in Server |-> Nil]
InitCandidateVars == /\ votesResponded  = [i \in Server |-> {}]
                     /\ votesGranted    = [i \in Server |-> {}]
InitLeaderVars    == /\ nextIndex       = [i \in Server |-> [j \in Server |-> 1]]
                     /\ matchIndex      = [i \in Server |-> [j \in Server |-> 0]]
                     /\ leaderStartLog  = [i \in Server |-> <<>>]
                     /\ commitedEntries = <<>>
InitLogVars       == /\ log             = [i \in Server |-> << >>]
                     /\ commitIndex     = [i \in Server |-> 0]
Init              == /\ messages        = [m \in {} |-> 0]
                     /\ InitHistoryVars
                     /\ InitServerVars
                     /\ InitCandidateVars
                     /\ InitLeaderVars
                     /\ InitLogVars

----
\* 状態遷移の定義

\* Server i は永続化ストレージに保存した情報(currentTerm, votedFor, log)を使用してリスタートする
\* currentTerm, votedFor, log 以外の情報は全て失う
Restart(i) ==
    /\ state'          = [state EXCEPT ![i] = Follower]
    /\ votesResponded' = [votesResponded EXCEPT ![i] = {}]
    /\ votesGranted'   = [votesGranted EXCEPT ![i] = {}]
    /\ voterLog'       = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
    /\ nextIndex'      = [nextIndex EXCEPT ![i] = [j \in Server |-> 1]]
    /\ matchIndex'     = [matchIndex EXCEPT ![i] = [j \in Server |-> 0]]
    /\ commitIndex'    = [commitIndex EXCEPT ![i] = 0]
    /\ UNCHANGED <<messages, currentTerm, votedFor, log, elections, leaderStartLog, commitedEntries>>

\* Server i は Timeout して新しい選挙を開始する
Timeout(i) == /\ state[i] \in {Follower, Candidate}
              /\ state'          = [state EXCEPT ![i] = Candidate]
              /\ currentTerm'    = [currentTerm EXCEPT ![i] = currentTerm[i] + 1]
              \* ほとんどの実装では自分自身への投票を atomic な操作として実装するだろうが、
              \* ここではより弱い localhost へのメッセージとして形式化する
              /\ votedFor'       = [votedFor EXCEPT ![i] = Nil]
              /\ votesResponded' = [votesResponded EXCEPT ![i] = {}]
              /\ votesGranted'   = [votesGranted EXCEPT ![i] = {}]
              /\ voterLog'       = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
              /\ UNCHANGED <<messages, leaderVars, logVars>>

\* Candidate i は Server j に対して RequestVote request を送信する
RequestVote(i, j) ==
    /\ state[i] = Candidate
    /\ j \notin votesResponded[i]
    /\ Send([mtype         |-> RequestVoteRequest,
             mterm         |-> currentTerm[i],
             mlastLogTerm  |-> LastTerm(log[i]),
             mlastLogIndex |-> Len(log[i]),
             msource       |-> i,
             mdest         |-> j])
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

\* Leader i は Server j に対して最大で 1 つの entry を含む AppendEntries request を送信する
\* 実際の実装では複数の entry を一度に送信したいと思うが、一般性を失うことなく atomic な領域を最小化するために
\* ここでは 1 つの entry に限定する
AppendEntries(i, j) ==
    /\ i /= j
    /\ state[i] = Leader
    /\ LET prevLogIndex == nextIndex[i][j] - 1
           prevLogTerm  == IF prevLogIndex > 0 THEN
                               log[i][prevLogIndex].term
                           ELSE
                               0
           \* log の最後に到達するまで最大で 1 つの entry を送信する
           lastEntry    == Min({Len(log[i]), nextIndex[i][j]})
           entries      == SubSeq(log[i], nextIndex[i][j], lastEntry)
       IN Send([mtype          |-> AppendEntriesRequest,
                mterm          |-> currentTerm[i],
                mprevLogIndex  |-> prevLogIndex,
                mprevLogTerm   |-> prevLogTerm,
                mentries       |-> entries,
                \* mlog は証明で使用する記録用の変数で実装には現れない
                mlog           |-> log[i],
                mcommitIndex   |-> Min({commitIndex[i], lastEntry}),
                msource        |-> i,
                mdest          |-> j])
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

\* Candidate i が Leader に遷移する
BecomeLeader(i) ==
    /\ state[i]        = Candidate
    /\ votesGranted[i] \in Quorum
    /\ state'          = [state EXCEPT ![i] = Leader]
    /\ nextIndex'      = [nextIndex EXCEPT ![i] =
                             [j \in Server |-> Len(log[i]) + 1]]
    /\ matchIndex'     = [matchIndex EXCEPT ![i] =
                             [j \in Server |-> 0]]
    /\ elections'      = elections \cup
                             {[eterm     |-> currentTerm[i],
                               eleader   |-> i,
                               elog      |-> log[i],
                               evotes    |-> votesGranted[i],
                               evoterLog |-> voterLog[i]]}
    /\ leaderStartLog' = [leaderStartLog EXCEPT ![i] = log[i]]
    /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars, commitedEntries>>

\* Leader i は値 v を log に追加する client の要求を受け付ける
ClientRequest(i, v) ==
    /\ state[i] = Leader
    /\ LET entry == [term  |-> currentTerm[i],
                     value |-> v]
           newLog == Append(log[i], entry)
       IN  log' = [log EXCEPT ![i] = newLog]
    /\ UNCHANGED <<messages, serverVars, candidateVars,
                   leaderVars, commitIndex>>

\* Leader i は commitIndex を進める
\* この処理は AppendEntries response の処理とは別のステップとして行われます。
\* 理由は、atomic な領域を最小化するためと、
\* 単一サーバのクラスタであっても Leader が entry を commit できるようにするためです。
AdvanceCommitIndex(i) ==
    /\ state[i] = Leader
    /\ LET \* index まで同じ log を保持している Server の集合
           Agree(index)   == {i} \cup {k \in Server :
                                          matchIndex[i][k] >= index}
           \* 過半数の Server が一致した entry を持っている index の集合
           agreeIndexes   == {index \in 1..Len(log[i]) :
                                 Agree(index) \in Quorum}
           \* commitIndex'[i] の新しい値
           \* log の Max(agreeIndexes) の entry の term が Leader の currentTerm と一致する場合にのみ
           \* commitIndex が更新されることに注意
           newCommitIndex ==
               IF /\ agreeIndexes /= {}
                  /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
               THEN
                   Max(agreeIndexes)
               ELSE
                   commitIndex[i]
       IN
         /\ commitIndex'     = [commitIndex EXCEPT ![i] = newCommitIndex]
         /\ commitedEntries' = commitedEntries \o SubSeq(log[i], Len(commitedEntries)+1, newCommitIndex)
    /\ UNCHANGED <<messages, serverVars, candidateVars, nextIndex, matchIndex, elections, leaderStartLog, log>>

----
\* Message handlers
\* i = 受信 Server, j = 送信 Server, m = message

\* m.mterm <= currentTerm[i] が成り立つ場合に
\* Server i は Server j からの RequestVote request を受信する
HandleRequestVoteRequest(i, j, m) ==
    LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
                 \/ /\ m.mlastLogTerm = LastTerm(log[i])
                    /\ m.mlastLogIndex >= Len(log[i])
        grant == /\ m.mterm = currentTerm[i]
                 /\ logOk
                 /\ votedFor[i] \in {Nil, j}
    IN /\ m.mterm <= currentTerm[i]
       /\ \/ grant  /\ votedFor' = [votedFor EXCEPT ![i] = j]
          \/ ~grant /\ UNCHANGED votedFor
       /\ Reply([mtype        |-> RequestVoteResponse,
                 mterm        |-> currentTerm[i],
                 mvoteGranted |-> grant,
                 \* mlog は証明で使用する記録用の変数で実装には現れない
                 mlog         |-> log[i],
                 msource      |-> i,
                 mdest        |-> j],
                 m)
       /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>

\* m.mterm = currentTerm[i] が成り立つ場合に
\* Server i は Server j からの RequestVote response を受信する
HandleRequestVoteResponse(i, j, m) ==
    \* 現在の状態が Candidate でない場合でも票を集計しますが、単に無視されるので問題ありません。
    /\ m.mterm = currentTerm[i]
    /\ votesResponded' = [votesResponded EXCEPT ![i] =
                              votesResponded[i] \cup {j}]
    /\ \/ /\ m.mvoteGranted
          /\ votesGranted' = [votesGranted EXCEPT ![i] =
                                  votesGranted[i] \cup {j}]
          /\ voterLog' = [voterLog EXCEPT ![i] =
                              voterLog[i] @@ (j :> m.mlog)]
       \/ /\ ~m.mvoteGranted
          /\ UNCHANGED <<votesGranted, voterLog>>
    /\ Discard(m)
    /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>

\* m.mterm <= currentTerm[i] が成り立つ場合に
\* Server i は Server j から AppendEntries request を受信する
\* ここでは、長さが 0 か 1 の m.entries のみを処理するが、複数回の独立した 1 entry の request
\* を処理する場合と同様に扱うことで実際の実装では複数の entry を含む m.entries を受信して処理してよい
HandleAppendEntriesRequest(i, j, m) ==
    LET logOk == \/ m.mprevLogIndex = 0
                 \/ /\ m.mprevLogIndex > 0
                    /\ m.mprevLogIndex <= Len(log[i])
                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
    IN /\ m.mterm <= currentTerm[i]
       /\ \/ /\ \* request を拒否する
                \/ m.mterm < currentTerm[i]
                \/ /\ m.mterm = currentTerm[i]
                   /\ state[i] = Follower
                   /\ \lnot logOk
             /\ Reply([mtype           |-> AppendEntriesResponse,
                       mterm           |-> currentTerm[i],
                       msuccess        |-> FALSE,
                       mmatchIndex     |-> 0,
                       msource         |-> i,
                       mdest           |-> j],
                       m)
             /\ UNCHANGED <<serverVars, logVars>>
          \/ \* Follower 状態に戻る
             /\ m.mterm = currentTerm[i]
             /\ state[i] = Candidate
             /\ state' = [state EXCEPT ![i] = Follower]
             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
          \/ \* request を受け入れる
             /\ m.mterm = currentTerm[i]
             /\ state[i] = Follower
             /\ logOk
             /\ LET index == m.mprevLogIndex + 1
                IN \/ \* すでに request の内容を処理済み
                       /\ \/ m.mentries = << >>
                          \/ /\ Len(log[i]) >= index
                             /\ log[i][index].term = m.mentries[1].term
                       \* 次の処理は古い request や重複した request を受信した場合
                       \* commitIndex を減らす可能性があるが実際にはほとんど何の影響も及ぼさない 
                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
                                              m.mcommitIndex]
                       /\ Reply([mtype           |-> AppendEntriesResponse,
                                 mterm           |-> currentTerm[i],
                                 msuccess        |-> TRUE,
                                 mmatchIndex     |-> m.mprevLogIndex +
                                                     Len(m.mentries),
                                 msource         |-> i,
                                 mdest           |-> j],
                                 m)
                       /\ UNCHANGED <<serverVars, logVars>>
                   \/ \* conflict: 1 entry を削除
                       /\ m.mentries /= << >>
                       /\ Len(log[i]) >= index
                       /\ log[i][index].term /= m.mentries[1].term
                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
                                          log[i][index2]]
                          IN log' = [log EXCEPT ![i] = new]
                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
                   \/ \* no conflict: entry を追加
                       /\ m.mentries /= << >>
                       /\ Len(log[i]) = m.mprevLogIndex
                       /\ log' = [log EXCEPT ![i] =
                                      Append(log[i], m.mentries[1])]
                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
       /\ UNCHANGED <<candidateVars, leaderVars>>

\* m.mterm = currentTerm[i] が成り立つ場合に
\* Server i は Server j からの AppendEntries response を受信する
HandleAppendEntriesResponse(i, j, m) ==
    /\ m.mterm = currentTerm[i]
    /\ \/ /\ m.msuccess \* 成功した場合
          /\ nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
          /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
       \/ /\ \lnot m.msuccess \* 失敗した場合
          /\ nextIndex'  = [nextIndex EXCEPT ![i][j] =
                               Max({nextIndex[i][j] - 1, 1})]
          /\ UNCHANGED <<matchIndex>>
    /\ Discard(m)
    /\ UNCHANGED <<serverVars, candidateVars, logVars, elections, leaderStartLog, commitedEntries>>

\* 受信者の currentTerm よりも新しい term を持つ任意の RPC message は受信者の currentTerm を初めに更新する
UpdateTerm(i, j, m) ==
    /\ m.mterm > currentTerm[i]
    /\ currentTerm'    = [currentTerm EXCEPT ![i] = m.mterm]
    /\ state'          = [state       EXCEPT ![i] = Follower]
    /\ votedFor'       = [votedFor    EXCEPT ![i] = Nil]
    \* message m はさらに処理する必要があるので messages に変更はない
    /\ UNCHANGED <<messages, candidateVars, leaderVars, logVars>>

\* 古い term を持つ reponse は破棄する
DropStaleResponse(i, j, m) ==
    /\ m.mterm < currentTerm[i]
    /\ Discard(m)
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

\* message を受信する
Receive(m) ==
    LET i == m.mdest
        j == m.msource
    IN \* 任意の新しい term を持つ RPC は受信者の term を先に更新する
       \* 古い term を持つ response は破棄される
       \/ UpdateTerm(i, j, m)
       \/ /\ m.mtype = RequestVoteRequest
          /\ HandleRequestVoteRequest(i, j, m)
       \/ /\ m.mtype = RequestVoteResponse
          /\ \/ DropStaleResponse(i, j, m)
             \/ HandleRequestVoteResponse(i, j, m)
       \/ /\ m.mtype = AppendEntriesRequest
          /\ HandleAppendEntriesRequest(i, j, m)
       \/ /\ m.mtype = AppendEntriesResponse
          /\ \/ DropStaleResponse(i, j, m)
             \/ HandleAppendEntriesResponse(i, j, m)

\* End of message handlers.
----
\* Network 状態の遷移

\* network は message を複製する
DuplicateMessage(m) ==
    /\ Send(m)
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

\* network は message を落とす
DropMessage(m) ==
    /\ Discard(m)
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

----
\* 変数の遷移の仕方の定義
Next ==  \/ \E i \in Server : Restart(i)
         \/ \E i \in Server : Timeout(i)
         \/ \E i,j \in Server : RequestVote(i, j)
         \/ \E i \in Server : BecomeLeader(i)
         \/ \E i \in Server, v \in Value : ClientRequest(i, v)
         \/ \E i \in Server : AdvanceCommitIndex(i)
         \/ \E i,j \in Server : AppendEntries(i, j)
         \/ \E m \in DOMAIN messages : Receive(m)
         \/ \E m \in DOMAIN messages : DuplicateMessage(m)
         \/ \E m \in DOMAIN messages : DropMessage(m)
         \* 全ての log を追跡する History 変数:
         \* 今回の検証では使用しないのでコメントアウト
\*       /\ allLogs' = allLogs \cup {log[i] : i \in Server}

\* Raft の状態遷移は初期状態から始まって Next の定義にしたがって遷移する
Spec == Init /\ [][Next]_vars

----
\* Raft は常に以下の性質が成り立つことを保証します
\* 以下の Operator を Invariant として TLA model checker に設定してください

\* 1 つの term には多くとも 1 つの Leader が存在する
ElectionSafety == \A s1,s2 \in Server: s1 /= s2 /\ currentTerm[s1] = currentTerm[s2] => state[s1] /= Leader \/ state[s2] /= Leader

\* Leader は自分自身の log に対して追記のみをして、書き換えや削除をしない
LeaderAppendOnly == \A i \in Server : state[i] = Leader => SubSeq(log[i], 1, Len(leaderStartLog[i])) = leaderStartLog[i]

\* 2 つの log が同じ index に同じ term の entry を持つなら、その index までの entry は全て一致する
LogMatching == \A i,j \in Server: i /= j => LET k == MaxMatchTermIndex(log[i], log[j]) IN SubSeq(log[i], 1, k) = SubSeq(log[j], 1, k)

\* もしある entry がある term に commit されたなら、その term 以降の全ての Leader はその entry を log の中に含む
LeaderCompleteness == \A i \in Server : state[i] = Leader => commitedEntries = SubSeq(log[i], 1, Len(commitedEntries))

\* ある Server の log の特定の index の entry を自身の state machine に適用するとき
\* 他の全ての Server の log のその index には同じ entry が存在する
\* 各 Server が自身の state machine に適用する entry は必ず commitIndex 以下である必要がある
StateMachineSafety == LET
                        minCommitIndex == Min({ commitIndex[i] : i \in Server})
                      IN
                        \A i,j \in Server: SubSeq(log[i], 1, minCommitIndex) = SubSeq(log[j], 1, minCommitIndex)

=============================================================================

TLA+ による Raft の検証方法

TLA+ Toolbox のインストール

以下の GitHub のページにアクセスして、自身の環境に応じた TLA+ Toolbox の zip ファイルをダウンロードしてインストールします。
github.com

TLA+ Toolbox のインストール
TLA+ Toolbox のインストール

新規 Spec の作成

TLA+ Toolbox を起動したら、メニューの 「File」->「Open Spec」->「Add New Spec」を選択します。適当なディレクトリに main.tla という名前でファイルを作成します。

新規 Spec の作成
新規 Spec の作成

ファイルを作成したら上のコードを貼り付けます。

検証用コードの貼り付け
検証用コードの貼り付け

検証コードの 1 行目の MODULE 名とファイル名は一致する必要があります。もし、上の操作で別の名前をつけた場合は MODULE main の箇所を MODULE *fileName* に修正してください。

Model の作成と実行

次に Model を作成します。メニューの 「TLC Model Checker」->「New Model」を選択します。モデルの名前はデフォルトの Model_1 で構いません。

新規モデルを作成
新規モデルを作成

すると 10 errors detected というエラーが表示されます。これは main.tla で使用している定数に値が割り当てられていないという意味なので値を割り当てていきます。

モデル作成時のエラー
モデル作成時のエラー

画面中央右の What is the model? という欄の AppendEntriesResponse の行をダブルクリックするとダイアログが表示されます。

定数への値の割り当て
定数への値の割り当て

AppendEntriesResponse に対しては Model value を選択して Finish ボタンをクリックします。同様に Value と Server 以外の定数に Model Value を割り当てます。

残りの定数への値の割り当て
残りの定数への値の割り当て

Value は Client が Raft に対して書き込む内容です。アプリケーションによって幾つの値を取りうるかは変化します。一方で、Value の種類を増やしても Raft の性質の検証にはあまり役に立たない上に、探索する必要のある状態空間が増えるのでここでは 2 つの値を割り当てます。
Value に集合を割り当てるには Set of model value を選択して値をフォームに入力します。ここでは {op1, op2} を Value に割り当てます。

定数 Value に値を割り当て
定数 Value に値を割り当て

最後に Server に値を割り当てます。Raft において各 Server の役割は対称的です。つまりそれぞれに区別をつける必要がありません。例えば 3 つの Server {s1, s2, s3} がある場合に s1 がはじめに Leader になるパターンと s2, s3 がそれぞれはじめに Leader になるパターン全部を検証する必要はありません。s1 が Leader になるパターンだけを検証すれば十分です。このように区別をつけないようにすることで探索する必要のある空間を大幅に減らすことができます。
Server に区別をつけない値の集合を割り当てるには Set of model value を選択した後に、Symmetry set にチェックを入れます。ここでは Server に {s1, s2, s3} を割り当てます。

定数 Server に値を割り当て
定数 Server に値を割り当て

ここでは Server が 3 台の場合を検証する手順を示しましたが、1 台や 2 台 の場合も同様です(1 台の場合は Symmetry set のチェックボックスを外す)。4 台以上でも構いません。次に、下の What to check? の Invariant をクリックしてフォームを開きます。ここに検証する必要のある性質を指定していきます。

Invariant を指定するフォーム
Invariant を指定するフォーム

右の Add をクリックするとダイアログが表示されるので、フォームに ElectionSafety と入力して Finish をクリックします。

Invariant に ElectionSafety を指定
Invariant に ElectionSafety を指定

同様に LeaderAppendOnly、LogMatching、LeaderCompleteness、StateMachineSafety を追加します。

全ての Invariant を指定
全ての Invariant を指定

Invariant に指定することで、Raft が探索して発見した全ての状態において、Invariant に指定した論理式が True になるかチェックされます。もし、False になるような状態が見つかればエラーと共に、その状態に至る過程と全ての変数の値が表示されるので原因を特定します。

検証を始めるには、左上の緑の Run TLC ボタンをクリックします。

モデルチェックの開始ボタン
モデルチェックの開始ボタン

自動的に Model Check Results ページがオープンして Model Check の進行状況を表示します。

Model Check Results ページ
Model Check Results ページ

ちなみに、この Raft の検証コードは問題がないはずなのでモデルチェックはいつまでたっても終わりません。Leader の log は延々と伸び続けるだけだからです。
モデルチェックに対して何かしらの制約をかけて終わらせたい場合は、Additional Spec Options のページの State Constraint に適切な制約を記述することで終わらせることもできます。

Model Check を実行し続けるとすぐにチェックした状態数が数千万単位になります。それだけの状態数において上の Invariant が満たされ続けるということが確認できたので、Raft のアルゴリズムの健全性を高い確度で確信できます。この網羅性の高さと、チェックする状態数の多さがテストと比較した場合の形式仕様記述 + モデル検査ツールの優位性です。

補足: コマンドラインでの検証

手元のマシンではなく、クラウド上により強力なマシンを用意して、より高速に検証をしたいという方向けに TLC Model Checker をコマンドラインで動かす方法もあります。
medium.com

Java は OpenJDK ではなくて OracleJDK を使用します。OpenJDK ではエラーがでます。OracleJDK をインストールしたら以下のツールを git clone してインストールします。
github.com

コマンドラインでの検証用に上の main.tla と同じディレクトリに以下の 2 つのファイルを MC.tla、MC.cfg という名前で保存します。このファイルの内容は、main.tla を保存したディレクトリに TLC Model Checker によって生成される main.toolbox 内からコピーしたものです。

---- MODULE MC ----
EXTENDS main, TLC

\* MV CONSTANT declarations@modelParameterConstants
CONSTANTS
op1, op2
----

\* MV CONSTANT declarations@modelParameterConstants
CONSTANTS
s1, s2, s3
----

\* MV CONSTANT definitions Value
const_1593089061951878000 ==
{op1, op2}
----

\* MV CONSTANT definitions Server
const_1593089061951879000 ==
{s1, s2, s3}
----

\* SYMMETRY definition
symm_1593089061951880000 ==
Permutations(const_1593089061951879000)
----

=============================================================================
\* MV CONSTANT declarations
CONSTANTS
op1 = op1
op2 = op2
\* MV CONSTANT declarations
CONSTANTS
s1 = s1
s2 = s2
s3 = s3
\* CONSTANT declarations
CONSTANT AppendEntriesResponse = AppendEntriesResponse
\* CONSTANT declarations
CONSTANT Follower = Follower
\* CONSTANT declarations
CONSTANT Leader = Leader
\* CONSTANT declarations
CONSTANT Nil = Nil
\* CONSTANT declarations
CONSTANT RequestVoteResponse = RequestVoteResponse
\* CONSTANT declarations
CONSTANT Candidate = Candidate
\* CONSTANT declarations
CONSTANT RequestVoteRequest = RequestVoteRequest
\* CONSTANT declarations
CONSTANT AppendEntriesRequest = AppendEntriesRequest
\* MV CONSTANT definitions
CONSTANT
Value <- const_1593089061951878000
\* MV CONSTANT definitions
CONSTANT
Server <- const_1593089061951879000
\* SYMMETRY definition
SYMMETRY symm_1593089061951880000
\* SPECIFICATION definition
SPECIFICATION
Spec
\* INVARIANT definition
INVARIANT
ElectionSafety
LeaderAppendOnly
LogMatching
LeaderCompleteness
StateMachineSafety

最後に以下のコマンドを実行します。

tlc MC.tla -config MC.cfg -workers auto

Raft の拡張について

上で検証した TLA+ のコードは Raft のコアアルゴリズムに関する記述です。一方で、実際に Raft を運用する場合はさらにいくつかの機能を追加する必要があります。論文では以下の 4 つの拡張機能について検討されています。

  • Leadership Transfer (Chapter 3)
  • Membership Change (Chapter 4)
  • Log Compaction (Chapter 5)
  • Client Interaction (Chapter 6)

各機能の詳細は論文を参照してください。ここでは、拡張機能の検証コードを追加する視点でこれらの機能を見ていきます。今後コードを書いて検証する予定のない方は飛ばしてください。私はこれらの機能を全て追加した検証コードを実装し検証済みです。公開はしないので興味のある方はご連絡ください。

Leadership Transfer

Leadership Transfer は Leader が意図的に Leadership を他の Server に移譲する機能です。例えば、メンテナンスのために Leader の Server をクラスターから外す場合や、Leader が動く Server が高負荷状態にある場合やネットワークレイテンシーが大きい場合に意図的に他の Server に Leadership を移譲したい場合に使用します。
Leader を単に停止すれば他の Server が Election Timeout 後に自動的に選挙を開始しますが、Leadership Transfer を使用すれば Election Timeout を待たずに選挙のプロセスをすぐに開始でき、ダウンタイムを最小にできます。

Leadership Transfer を検証コードに追加するのは簡単です。新しい Message Type として TimeoutNow を追加します。Leader は自身が保有する log と十分に多くの entry を共有している Server に対して TimeoutNow メッセージを送信します。TimeoutNow メッセージを受信した Follower はすぐに Candidate に遷移し新しい選挙を開始します。

Leadership Transfer は拡張機能の中で検証コードの実装が一番簡単なので練習問題として実装して検証してみるといいでしょう。

Membership Change

実運用環境では Raft クラスターを運用しながら落ちた Server をクラスターから外したり、replication 数を増やすために Server を追加する必要があります。Raft ではクラスターに 1 台ずつ Server を追加または削除するという操作を繰り返すことで任意回数のクラスターメンバーの入れ替えを実現します。ここまでは定数 Server に指定した Server の集合は全て Raft のクラスターに参加していると想定していました。この拡張機能では Server の集合のうち Raft のクラスターに参加している Server が動的に変化する状況で Invariant が成り立つことを検証します。

追加する RPC は AddServer、RemoveServer の 2 つです。さらに Server の状態として NonVoting、Waiting を追加します。この 2 つの状態はクラスターに参加していない状態を表現します。さらに Configuration(クラスター構成情報) を log の entry として共有するので、entry にも ETValue、ETConfig の 2 つの型を導入します。

Waiting 状態にある Server をすぐに Follower としてクラスターに参加させると、新しい Follower の log が Leader の log に追いつくまで entry が中々 commit できずに可用性が低下する場合があります。その状況を避けるために、クラスターに参加させる前に log が追いつくまで待機するのが NonVoting 状態です。NonVoting 状態の Server は Leader から AppendEntriesRequest を受信しますが、選挙には関わりません。また、NonVoting 状態の Server が存在する状態で Leader の変更があった場合にどうするか、など取り扱いが複雑です。
Configuration についても、ETConfig 型の entry はcommit されていなくても有効なものと判断して参照しますが、新しい AddServer RPC、RemoveServer RPC を受け付けるのは最新の ETConfig 型の entry が commit 済みの場合のみでタイミングにズレがあります。

Log Compaction

Raft の log は動作期間を通して伸びる一方です。log が大きくなると、より多くのディスクを占有し、再起動時の state machine の再構築により多くの時間がかかるようになります。そして、最終的には可用性の問題が発生します。従って、実用的なシステムでは何らかの形での log を圧縮する必要があります。

TLA+ で検証する際には Memory Base か Disk Base かは関係ありません。Incremental Approach は Raft の検証よりも log-structured merge trees のアルゴリズムの検証になりそうなので私は検証していません。Leader Base Snapshot も実装はしていませんがそれほど複雑にはならないと思います。ここでは Memory Base Snapshot を実装する場合を想定します。
state machine が Snapshot を取得すると、Snapshot を作成した時点で state machine に適用している log の index までは破棄することができます。そこで、破棄した index を記憶するために prevIndex、prevTerm を新たに Server ごとの変数として追加します。さらに、Invariant を検証するために snapshot という変数を追加して、そこに削除した log を記録しておくといいでしょう。
log を削除すると log の長さが変わるので、今まで Len(log[i]) のように log の長さを直接使用していた箇所を全て書き換える必要があります。さらに、Follower が必要とする log の entry をすでに削除していた場合に Snapshot を転送する InstallSnapshot RPC を実装します。InstallSnapshot RPC も 1 回の実行で Snapshot を全て転送するのではなく徐々に転送するようにすると実際のシステムの検証により近づくでしょう。

Client Interaction

論文では、Client Interaction は Membership Change や Log Compaction と比較すると簡潔に述べられていますが実装は一番大変です。Client Interaction を考慮する必要がある理由としては Raft をベースに構築した分散ロックシステムを利用する、論文の Figure 6.2 の状況が一番わかりやすいと思います。

f:id:hitotakuchan:20200627213119p:plain
Figure 6.2: 重複したコマンドによる誤った結果の例。クライアントがロックを取得するためにコマンドを送信したとします。クライアントの最初のコマンドはロックを取得しましたが、クライアントは確認応答を受け取りませんでした。クライアントがリクエストを再試行すると、ロックは獲得済みのためリクエストは失敗します。

Figure 6.2 の状況でも正しく動作するために、各 Server は Client のセッション情報と Client の request を処理した結果の Cache をそれぞれ保持する必要があります。Leader は Client からの request を管理する Queue と処理結果の Cache を使用して Client の request を適切に処理します。Client は自身に割り当てられた Client ID とメッセージの index を管理してメッセージにユニークな ID を割り当てる必要があります。また、論文では ClientRequest RPC と ClientQuery RPC を別の処理として説明していますが、entry の型だけが違う同じ処理としていいと思います。

Client Interaction では Raft の 5 つの性質に加えて「同じ ID を持つ Client の request が Raft によって 2 度処理されることはない」という Invariant を加えて検証するといいでしょう。

おわりに

形式仕様記述の道を極めていきたいと思った人はまずこの記事のコードに上で説明した拡張機能を実装して検証するといいでしょう。プログラミングの経験がある方は TLA+ の記述法にはじめは違和感を感じるかもしれませんがすぐに意図したアルゴリズムを実装できるようになると思います。
次の段階として、この記事では Raft の満たす 5 つの性質を私が論理式に直しましたが、自分自身で論理式として表現できるか確認してください。あるいは、他に満たすべき性質はないか?拡張機能について満たす必要のある性質は何か?を考えてそれを論理式として追加し検証してください。ただし、多くの人はこの段階で躓くと思います。

上で引用した記事にも同様のことに言及している箇所があるので引用します。

このような事情も相まって、形式手法を上手に扱うのにはかなりの知識が必要です。実際に取り組んでみるとわかるのですが、まず人間は意図通りの論理式やモデルを書けません(必要な条件を忘れていたりそもそも意図が間違ってたりする)。当たり前の話ですが、従来のコードでさえ正しく書けないのに、より抽象度の高い論理式やモデルを最初から正しく書けるはずがないのです

正確に意図を論理式として表現する能力はプログラミングというよりも数学の領域の能力だと思います。この能力は、大学で数学を先行する初学者も身につけるのに大いに苦労する能力だと思います。この能力をどうやって身につけるか?という問いに明確な回答を私は持ち合わせてはいませんが、場数を踏む必要があるのは間違いありません。今後このブログで TLA+ を用いてプログラムの検証を行う内容の記事を書く予定です。

最後に、Raft を TLA+ を用いて検証する手順を通して形式仕様記述やモデル検査とはどういうものなのか?検証とテストの違いは何か?といったことを大雑把に把握して今後の学習のためのエントリーポイントとしてこの記事を利用してもらえると嬉しいです。

Raft 理解度を調べるクイズ

  • Client からの接続が Timeout までに commit が成功しなかった entry はその後 commit されるか?
  • Leader は Client からの書き込み entry を log に保存すると同時に state machine を更新してもよいか?
  • Leader は Leader になった時点で自身の log に存在する entry を全て commit してもよいか?
  • commit index は Leader だけが保持しているのか?それとも全ての server がそれぞれ保持しているか?
  • 各 server が保持する currentTerm を更新するのは正確にいつといつか?

参考資料

Raft に関する資料

TAL+ の仕様を理解する上では 3 章と 8 章を読むといいです。
4,5,6 章は Raft をより実用的なシステムにする上で必要な追加機能の解説です。論文中でも、このブログでも検証しているのは Raft のコアアルゴリズムの部分だけで、これらの機能を追加した場合の検証はされていません。Raft を実装しプロダクション用途で使用する場合にはこれらの機能も合わせて検証する必要があるでしょう。
9, 10 章は Raft の実装の性能評価などが書かれているので自身で Raft を実装する人は読むといいと思います。また、Raft に依存した分散システムを運用している人が、適切な設定値を決める際の根拠として参照するといいでしょう。

  • Raft の考案者 Diego Ongaro による TLA+ 実装

github.com

  • LogCabin – Diego Ongaro による Raft の実装

github.com

  • Raft の動作する様子

Raft
Raft Consensus Algorithm

  • Raft に関して日本語で読める資料

PFI の勉強会のスライド資料
Raft(分散合意アルゴリズム)について · GitHub
分散合意アルゴリズム Raft を理解する - Qiita

TLA+ に関する資料

  • TLA+ のホームページ

lamport.azurewebsites.net

  • 形式手法についての記事

自動テストに限界を感じた私がなぜ形式手法に魅了されたのか - 若くない何かの悩み
AWSにおける形式手法 - masateruk’s blog
形式手法 (Formal Methods) についての調査 - つれづれ日記
ゼロから学んだ形式手法 - DeNA Testing Blog
TLA+の社内勉強会スライド公開します - takaha.siの技術メモ
[TLA+] TLA+と形式仕様言語 [目的と準備] | Developers.IO
FINAL FANTASY XV POCKET EDITION を支える AWS サーバレス技術

  • TLA+ の設計者 Leslie Lamport による TLA+ を用いた形式検証の教科書

これから TLA+ を使用して形式検証を行いたいと思う人はこの本を読みましょう。

  • TLA+ をプログラマフレンドリーな構文にした PlusCal を用いた形式検証の教科書

Practical TLA+: Planning Driven Development

Practical TLA+: Planning Driven Development

  • 作者:Wayne, Hillel
  • 発売日: 2018/10/12
  • メディア: ペーパーバック
PlusCal を用いて仕様を記述すると TLA+ と比較して簡略になる部分はあります。
一方で、私はこの記事を書くにあたって Raft の仕様を PlusCal で書き換えるという作業を行いました。その経験で言うと、簡単な小さい仕様であれば PlusCal で記述する方が簡潔になりますが、Raft のようにアルゴリズムが複雑で大きい仕様になると PlusCal の表現力の弱さが足かせになります。
この本を読むこと自体は有益なのでお勧めしますが、PlusCal を実際に仕事で使うかと言われれば、私は使いません。