PythonでPaxos

toy_paxos.pyの解説

前回と同じくcloudera社に勤務するHenryさんのPaxosの優しい解説。


今回の元記事は以下。なお、今回は全訳はなし。
http://the-paper-trail.org/blog/?p=190


今回はPythonで実装したtoy_paxosを解説する。
toy(おもちゃ)の由来は恐らく、氏がドク論執筆の合間に数時間で実装したもののため謙遜していると思われる。自分としては正直、よくこんな複雑なプログラムを数時間でかけるものだと感心する。
マルチスレッドではあるがマルチプロセスではない。単一PCの上で複数スレッドにて合意を形成する。Henryさんも書いているが、localhostがハードコードしてあるだけであり、マルチプロセスに書き直すのはそんなに難しくはないと思う。
氏が作成したプログラム"toy_paxos.py"は元記事の"A few hours later, I'd written this"のthisにあるリンクからダウンロードできる。
なお、ライセンスはGPLv2である。


プログラムの目的

このプログラムはLamportの"Paxos made simple"の最後に記述された分散環境上での状態機械の実装を行っている。
クライアントはクラスタの"leader"に対し命令の実行を依頼する。
leaderは受け取った命令をacceptorのグループに提案し、過半数のacceptorにより承認された場合、learner(leaderと兼任)に送られ実行(このプログラムでは記録のみ)される。
このプログラムはメインスレッドがクライアントを担当し、複数個の値(命令)をメッセージとしてLeaderに送信する。LeaderはPaxos合意プロトコルインスタンスを開始し、Acceptorのグループに値を提案し合意(過半数からの値の承認)を得る。そしてLearner(Leader)はその値を記録する。
メインスレッドはLeaderを監視し、全ての値が実行(記録)された時点でLeaderとAcceptorを停止し、Learnerが記録した値のヒストリーを表示して終了する。

クラス一覧

Message
メッセージのenum値を持つメッセージ構造体
MessagePump
メッセージの送受信を実際に行うスレッド。leaderとacceptorの両方で使われる。
InstanceRecord
プロトコルインスタンスをリストに保存する。leaderとacceptorの両方で使われる。
PaxosLeader
プロトコルのリーダ(proposer)の実装
PaxosLeaderProtocol
リーダ側のプロトコル進行を管理する状態機械
PaxosAcceptor
acceptorの実装
PaxosAcceptorProtocol
アクセプタ側のプロトコル進行を管理する状態機械

各クラスには内部クラスもあり、クラス数はこれより多い。

プログラムの構成

このプログラムではLeaderとAcceptorのインスタンスはそれぞれメッセージを送受信するためのMessagePump(メッセージポンプ)を持っている。MessagePumpは内部クラスにMPHelperを持ち、MessagePumpとMPHelperのそれぞれがThreadを継承し、両者が別スレッドにて実行される。LeaderとAcceptorを駆動するのはメッセージであり、MPHelperがメッセージを受信するとLeader及びAcceptorのrecvMessageを呼出す。そしてProtocol#doTransitionを呼出しプロトコルを回し新しいメッセージをメッセージポンプを利用して投げる。これによりお互いがメッセージを投げ合うことでプログラム全体が進行していく。


Paxos合意プロトコルの連続するインスタンスはLeaderとAcceptorのそれぞれで同じインスタンスIDにて管理される。Paxos合意プロトコルの個々のインスタンス1つにつき1つInstanceRecordのインスタンスをLeaderとAcceptorが持つ。またLeaderにはLeaderProtocol、AcceptorにはAcceptorProtocolのインスタンスが作成され、プロトコルIDにて管理される。PaxosLeaderとPaxosAcceptorは全てのInstanceRecordとProtocolのインスタンスを管理するマネージャ的存在であり、インスタンスは1つづつしか存在しない。LeaderとAcceptorのProtocolは必ず成功するとは限らないので合意プロトコル1つに対し、LeaderProtocol、AcceptorProtocolは複数存在し得る。


非常に紛らわしいので注意して欲しいが、このプログラムの目的が状態機械であると同時に、LeaderProtocolとAcceptorProtocolの実装は状態機械である。LeaderProtocolとAcceptorProtocolのインスタンスは個々に状態変数を持ち、状態遷移をdoTransitionメソッドにて実行する。

ハートビートと単一リーダの選出

このプログラムではLeaderが2個とAcceptorが5個存在する。
Leaderは2つとも起動時には自身がリーダだと考えている。(setPrimary(True))
Leaderは内部クラスにHeartbeatSenderとHeartbeatListenerを持っている。
そして自身がprimaryである限り、他のリーダ全てにハートビートを送る。
Leaderは自身がprimaryであると考えているのに他のハートビートを受け取る場合がある。この場合、相手のポート番号が自分のポート番号よりも大きい場合に相手にprimaryを譲り、自分はsetPrimary(False)する。
これにより単一のリーダが選出される。
逆にprimaryでないLeaderは常にハートビートを聞いており、ハートビートが途絶えると自らをsetPrimary(True)してprimaryとして行動を開始する。


現在、このプログラムではprimaryは1秒に1回、ハートビートを送信する。
primaryでないLeaderは2秒間ハートビートが途絶えた場合にprimaryとして行動開始する。

ギャップを埋める

リーダは約15秒に1回記憶のギャップを埋める。


"Paxos made simple"の最後にて解説されているように、複数のPaxosのインスタンスを順に実行すると、実行順にインスタンスが停止するとは限らない。
このため、Paxosで状態機械を実装すると命令が提案された順には合意されず、合意済みの命令と別の合意済み命令の間にまだ選択されていない(終了していない)命令の穴(ギャップ)が存在してしまう。このギャップを埋めなければ既に合意している命令も実行できない。ギャップを埋めるには新しい命令の提案を行っても良いが、"Paxos made simple"では"no-op"の命令を再提案する方法を紹介しており、このプログラムでも"no-op"に相当する0を未終了のインスタンスに対して再提案させている。結果的に既に動いている提案が遅れて選択されるか、再提案された"no-op"が選択される。これにより命令の実行を続けることが可能になる。このプログラムでは命令の実行は省力されているのでギャップを埋めても全体に影響は無い。

記憶

このプログラムはAcceptorが値を承認すると、primaryだけでなく全てのLeaderにACCEPTOR_ACCEPTのメッセージと共に値を送信する。これにより、primaryでないLeader(learner)も全ての値を記憶することができる。
"Paxos made simple"ではこのモデルを一番簡単な物として紹介しているが通信量が増える欠点があると指摘している。"distinguished learner"がleaderより値を受け取り、他のlearnerに配布する効率の良いモデルが記述されている。learner同士が通信を行う場合をcatchupと言う。

プログラムの流れ・序盤

メインスレッドが命令を1個メッセージとして投げた時、以下のようにメッセージがリーダに拾われ、リーダがメッセージをアクセプタに投げ、そして合意プロトコルが停止するまでメッセージの投げ合いが続く

  1. クライアントはleader2にMSG_EXT_PROPOSEを送る
  2. MessagePumpHelper <- leaderのが受け取る
  3. MessagePump
  4. PaxosLeader.recvMessage
  5. PaxosLeader.newProposal 提案の作成:prepareリクエス
  6. PaxosLeaderProtocol.propose 提案の実行
  7. PaxosLeader.sendMessage
  8. MessagePump.sendMessage <- Leaderのhas-a
  9. MessagePumpHelper <- Acceptorのhas-a
  10. MessagePump
  11. PaxosAcceptor.recvMessage
  12. PaxosAcceptorProtocol.recvProposal AGREEを返す
  13. MessagePumpHelper <- Leader
  14. MessagePump
  15. PaxosLeader.recvMessage
  16. PaxosLeaderProtocl.doTransition

以下、停止するまで続く

状態遷移図

PaxosLeaderProtocol
STATE_UNDEFINED -> STATE_PROPOSED -> STATE_AGREED -> STATE_ACCEPTED
               └> STATE_REJECTED                └> STATE_UNACCEPTED
PaxosAcceptorProtocol
STATE_UNDEFINED -> STATE_PROPOSAL_AGREED -> STATE_PROPOSAL_ACCEPTED
               └> STATE_PROPOSAL_REJECTED

以下の物は未使用

  • STATE_PROPOSAL_RECIEVED
  • STATE_PROPOSAL_UNACCEPTED

ソースコード解析

量が半端無いのと面倒になったのでgetterとかsetterとかはちょっとロジックが有っても解説無し。:-)

Java屋に微妙にわかりにくいのはLeaderとAcceptorの間には共通の親クラスもインターフェースもないのに、MessagePumpから同じ名前であるrecvMessageというメソッドが呼ばれる点。またインスタンス変数も動的に(JavaScriptみたく)追加できるので同じ変数のさす複数のインスタンスが別々の構造を持つことも可能だ。Pythonは柔軟な分、読むのが大変だなぁと言いたいだけ。(^^;

トップのコメント

# Naive implementation of the Paxos protocol.
# Henry Robinson, 2009
# Licensed under GPL v2

main

import time
if __name__ == '__main__':
    numclients = 5
    clients = [ PaxosAcceptor( port, [54321,54322] ) for port in xrange( 64320, 64320+numclients ) ]
    leader = PaxosLeader( 54321, [54322], [c.port for c in clients] )
    leader2 = PaxosLeader( 54322, [54321], [c.port for c in clients] )
    leader.start( )
    leader.setPrimary( True )
    leader2.setPrimary( True )
    leader2.start( )
    for c in clients:
        c.start( )

    clients[0].fail( )
    clients[1].fail( )
    # clients[2].fail( )
    
    # いくらかの提案を発行する
    s = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
    start = time.time( )
    for i in xrange(100):
        m = Message( Message.MSG_EXT_PROPOSE )
        m.value = 0 + i
        m.to = 54322
        bytes = pickle.dumps( m )
        s.sendto( bytes, ("localhost", m.to) )

    while leader2.getNumAccepted( ) < 99:
        print "Sleeping for 1s -- accepted:", leader2.getNumAccepted( )
        time.sleep( 1 )
    end = time.time( )    
        
    # print "Sleeping for 10s"
    # time.sleep( 10 )
    print "Stopping leaders"
    leader.stop( )    
    leader2.stop( )
    print "Stopping clients"
    for c in clients:
        c.stop( )

    print "Leader 1 history: ", leader.getHistory( )
    print "Leader 2 history: ", leader2.getHistory( )
    print end - start
  • クライアント数を5に設定
  • クライアント数の数だけPaxosAcceptorのインスタンスを作成してArrayにしている。コンストラクタの第1引数が自身のポート、第2引数は複数のリーダーのポート
  • 一つ目のリーダーの作成。第1引数が自分のポート。第2引数が他のリーダーのポート。第3引数がクライアントのポート。次に同様に2つ目のリーダーの作成
  • 第一、第二リーダーをプライマリに設定。後でポート番号により優先順位で調整される。両方のスレッドをスタート
  • クライアント全てをスタート
  • クライアントに擬似的に障害を発生させる。障害が過半数を越えるとうまく動かなくなるはず。
  • UDPソケットを用意
  • 現在時刻をstartに保存
  • インデックスiにて100回ループ
    • mに新しいメッセージ(EXT:external?)
    • m.valueにiを代入。iが順に入るがこれはストーリー上は状態機械に対する命令
    • m.toに54322。これはleader2
    • mをシリアライズしてbytesに代入
    • localhostのポートm.toにbytesを送る
  • leader2にて合意済みの値が100個になるまで時間稼ぎ
  • leaderを停止
  • clientを停止
  • leader1,2の履歴を表示。目視で比較
  • 実行時間の表示(秒)

PaxosLeader

PaxosLeaderはleaderのクラス。MessagePumpを持つことでメッセージをクライアントとacceptorとの間でやり取りする。
また内部クラスとしてHeartbeatSenderとHeartbeatListenerを持ち他のleader候補とハートビートを交換し、生存確認を行っている。

コンストラクタ(ポート番号、他のleader、acceptor)
class PaxosLeader( object ):
    def __init__(self, port, leaders=None, acceptors=None):
        self.port = port
        if leaders == None:
            self.leaders = []
        else:
            self.leaders = leaders
        if acceptors == None:
            self.acceptors = []
        else:
            self.acceptors = acceptors
        self.group = self.leaders + self.acceptors        
        self.isPrimary = False
        self.proposalCount = 0
        self.msgPump = MessagePump( self, port )
        self.instances = {}
        self.hbListener = PaxosLeader.HeartbeatListener( self )
        self.hbSender = PaxosLeader.HeartbeatSender( self )
        self.highestInstance = -1
        self.stopped = True
        # The last time we tried to fix up any gaps
        self.lasttime = time.time( )
  • 引数をインスタンス変数に代入
  • インスタンス変数groupはleaderとacceptorの和集合。使用されていない。
  • primaryフラグはデフォルトではfalse。後からsetPrimary
  • MessagePumpに自身の参照とポート番号を渡してインスタンス
  • instancesにPaxos合意アルゴリズムインスタンスの履歴を入れる。ここで空で初期化。
  • HeartbeatListenerとHeartbeatSenderを自身の参照を引数にインスタンス
  • 既知のインスタンス番号で最も大きい値を-1で初期化
  • 停止フラグ:デフォルトがTrue
  • lastime:最後にギャップを埋めた時間。現在値で初期化。
内部クラス:HeartbeatListner
    #------------------------------------------------------
    # These two classes listen for heartbeats from other leaders
    # and, if none appear, tell this leader that it should
    # be the primary
        
    class HeartbeatListener( threading.Thread ):
        def __init__( self, leader ):
            self.leader = leader
            self.queue = Queue.Queue( )
            self.abort = False
            threading.Thread.__init__( self )

        def newHB( self, message ):
            self.queue.put( message )

        def doAbort( self ): self.abort = True
            
        def run( self ):
            elapsed = 0
            while not self.abort:
                s = time.time( )
                try:
                    hb = self.queue.get( True, 2 )
                    # Easy way to settle conflicts - if your port number is bigger than mine,
                    # you get to be the leader
                    if hb.source > self.leader.port:
                        self.leader.setPrimary( False )
                except: # Nothing was got
                    self.leader.setPrimary( True )

HeartbeatListnerとHeartbeatSenderの2つが他のleaderとハートビートを交換する。他のleaderのハートビートが聞こえなくなった時(Noneが帰ったとき)このleaderがprimaryとして立候補する。

HeartbeatListnerはThreadを継承しており、ずっとrun()が実行される。
実際にハートビートを聞くのはMessagePumpであり、メッセージの種類からハートビートだと確認される。

コンストラク
他のleaderのリストを受けとりインスタンス変数にセット。スレッドを実行。FIFOのQueueを用意。以降、このクラスのrun()が実行される。
newHB(message)
MessagePumpがハートビートを受け取るとこれを呼出し、Queueにメッセージが積まれる
run
実際の本体。elapsedとsは使っていない(?)。
  • abortされるまでループ。
  • queue.getで第一引数Trueによりブロックウェイトをかけている。タイムアウトが2秒
  • ハートビートが届いた他のリーダーのポート番号が自分のポート番号より大きい場合、相手をprimaryと認め自分は引っ込む。
  • タイムアウトした場合には再び自らがprimaryを名乗る
内部クラスHeartbeatSender
    class HeartbeatSender( threading.Thread ):
        def __init__( self, leader ):
            self.leader = leader
            self.abort = False
            threading.Thread.__init__( self )            

        def doAbort( self ): self.abort = True
            
        def run( self ):
            while not self.abort:
                time.sleep( 1 )
                if self.leader.isPrimary:
                    msg = Message( Message.MSG_HEARTBEAT )
                    msg.source = self.leader.port
                    for l in self.leader.leaders:
                        msg.to = l
                        self.leader.sendMessage( msg )

HeartbeatSenderもThreadを継承している。オーナであるリーダがprimaryである場合のみ、1秒に1回他のleaderに対しハートビートを送る。

コンストラク
他のリーダーのリストを受け取り保存。スレッドを実行。以降、run()が実行される
run()
本体
  • abortされない間ループ
    • 1秒間寝るよ
    • オーナであるleaderがprimaryなら実行
      • MSG_HEARTBEATのメッセージを作成
      • 他のリーダー全てに対し、
        • 自身のオーナであるリーダからメッセージを送信する
PaxosLeader#sendMessage()
    #------------------------------------------------------
    def sendMessage( self, message ):
        self.msgPump.sendMessage( message )

MessagePumpのsendMessageに委譲するのみ

PaxosLeader#recvMessage(メッセージ)
    def recvMessage( self, message ):
        """メッセージポンプがこれを定期的に呼出す。メッセージが無い場合にも同様"""
        if self.stopped: return
        if message == None:
            # 15秒毎のみ実行。でないと実行中の良いプロトコルも切ってしまう:(      
            if self.isPrimary and time.time( ) - self.lasttime > 15.0: 
                self.findAndFillGaps( )
                self.garbageCollect( )
            return
        if message.command == Message.MSG_HEARTBEAT:
            self.hbListener.newHB( message )
            return True
        if message.command == Message.MSG_EXT_PROPOSE:
            print "External proposal received at", self.port, self.highestInstance
            if self.isPrimary:
                self.newProposal( message.value )
            # primeでなければ無視する。primaryでなくともproposalは受け取る場合有り
            # 親切にするならleaderが代わったと返信するだろう
            # そして新しいleaderのアドレスを教える。しかしそうしたって失敗する可能性はある
            return True
        if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
            self.instances[ message.instanceID ].getProtocol(message.proposalID).doTransition( message )        
        
        # Acceptorは値を承認するとプライマリでないリーダ全てにも
        # 同じメッセージを流すようにプログラムされている。
        # その場合、InstanceRecordもProtocolも無い場合が有る。

        if message.command == Message.MSG_ACCEPTOR_ACCEPT:
            if message.instanceID not in self.instances:
                self.instances[ message.instanceID ] = InstanceRecord( )
            record = self.instances[ message.instanceID ]
            if message.proposalID not in record.protocols:
                protocol = PaxosLeaderProtocol( self )
                # 状態をAccept待ちにする
                protocol.state = PaxosLeaderProtocol.STATE_AGREED
                protocol.proposalID = message.proposalID
                protocol.instanceID = message.instanceID
                protocol.value = message.value
                record.addProtocol( protocol )
            else:
                protocol = record.getProtocol( message.proposalID )
            # ここで初期化した場合も次に落ちる
            protocol.doTransition( message )
        return True

recvMessageはMessagePumpがメッセージを受け取った場合、またはタイムアウトした場合に呼出される。
Leaderは外部クライアントとacceptorと他のリーダ全てからメッセージをここで受け取るのでロジックが長い。Pythonにはswitch-case文が無いのでこのような形に。else文ほとんどを使っていないのでreturnが無い場合、意図して続行される点に注意。

  1. メッセージが無い場合
    • 15秒に一回、ギャップを埋めてGCを実行し終了
  2. MSG_HEARTBEATの場合
    • メッセージをHeartbeatListnerのキューに叩き込んで終了
  3. MSG_EXT_PROPOSE(クライアントからの提案)の場合
    • 自分がprimaryならnewPropossal(message.value)を呼んで終了
    • 自分がprimaryでないならただ終了
  4. 自分がprimaryでかつ、MSG_ACCEPTOR_ACCEPTではない場合
    • InstanceRecordからLeaderProtocolを取り出し、doTransitionを実行することで状態機械を進める。次のif文は絶対に真にならないのでここでreturnすると良いと思った
  5. メッセージがMSG_ACCEPTORの場合
    • (ここはleaderが提案した後、再起動やleaderを委譲しleaderでなくなった場合にも実行される)
    • InstanceRecordが無ければ作成して登録
    • proposalID(提案の順序番号)がない場合、つまりProtocolインスタンスが無い場合
    • Protocol#doTransitionを実行して状態機械を回す
PaxosLeader#newProposal(値, インスタンスID)
    def newProposal( self, value, instance = None ):
        protocol = PaxosLeaderProtocol( self )
        if instance == None:
            self.highestInstance += 1
            instanceID = self.highestInstance
        else:
            instanceID = instance
        self.proposalCount += 1
        id = (self.port, self.proposalCount )
        if instanceID in self.instances:
            record = self.instances[ instanceID ]
        else:
            record = InstanceRecord( )
            self.instances[ instanceID ] = record
        protocol.propose( value, id, instanceID )
        record.addProtocol( protocol )

PaxosLeaderがクライアントからproposalを受け取った時に呼出され、Paxos合意プロトコルインスタンスを作成し、プロトコルを開始する

PaxosLeader#notifyLeader(protocol, message)
    def notifyLeader( self, protocol, message ):
        # Protocols call this when they're done
        if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
           print "Protocol instance %s accepted with value %s" % (message.instanceID, message.value)
           self.instances[ message.instanceID ].accepted = True
           self.instances[ message.instanceID ].value = message.value
           self.highestInstance = max( message.instanceID, self.highestInstance )
           return
        if protocol.state == PaxosLeaderProtocol.STATE_REJECTED:
            # メッセージから値を確認してリトライする
            # いずれアクセプター達は何らかの値をこのインスタンスに対し
            # 承認するだろう。そしてプロトコルは完了する。
            self.proposalCount = max( self.proposalCount, message.highestPID[1] )
            self.newProposal( message.value )
            return True
        if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
            pass            

このメソッドは状態遷移機械であるPaxosLeaderProtocolが停止した場合に呼ばれる。

  1. STATE_ACCEPTEDの場合
  2. STATE_REJECTEDの場合
    • プロトコルの数を更新(highestPIDなど存在しない)
    • 値を再提案
  3. STATE_UNACCEPTEDの場合
    • 何もしない!
PaxosLeader#findAndFillGaps
    def findAndFillGaps( self ):
        # if no message is received, we take the chance to do a little cleanup
        for i in xrange(1,self.highestInstance):
            if self.getInstanceValue( i ) == None:
                print "Filling in gap", i
                self.newProposal( 0, i ) # This will either eventually commit an already accepted value, or fill in the gap with 0 or no-op
        self.lasttime = time.time( )

    def garbageCollect( self ):
        for i in self.instances:
            self.instances[i].cleanProtocols( )

"Paxos made simple"にて解説されていたギャップの整理。
連続したPaxosのインスタンス列を実行するとPaxosのインスタンスの実行終了は実行順になるとは限らず選択された値(命令)列には穴(ギャップ)ができる。これは現実の実行環境は常に非同期的な特性を持つため。
findAndFillGapsとgarbageCollectは常に同時に約15秒に一回、リーダのrecvMessageより呼ばれる。

  • 1からインスタンスIDの最大値までインデックスiにてループ
    • インスタンスiにて値が選択されていない時、0(no-op)にて再提案。この場合、既に提案済みの値か、このno-opが結果的に選択される。
  • 最後にギャップを埋めた時間を更新

PaxosLeaderProtocol

Paxos合意プロトコルの状態機械。プロトコルインスタンスの状態をインスタンス変数stateとして保持する。

初期化時STATE_UNDEFINED
アクセプターへの提案を終えるとSTATE_PROPOSED

コンストラク
class PaxosLeaderProtocol( object ):
    # State variables
    STATE_UNDEFINED = -1
    STATE_PROPOSED = 0
    STATE_AGREED = 1
    STATE_REJECTED = 2
    STATE_ACCEPTED = 3
    STATE_UNACCEPTED = 4
    
    def __init__( self, leader ):
        self.leader = leader
        self.state = PaxosLeaderProtocol.STATE_UNDEFINED
        self.proposalID = (-1,-1)
        self.agreecount, self.acceptcount = (0,0)
        self.rejectcount, self.unacceptcount = (0,0)
        self.instanceID = -1
        self.highestseen = (0,0)
  • リーダの参照をもらい相互参照にしておく。
  • その他、初期設定
PaxosLeaderProtocol#propose(値, 提案順序番号, インスタンスID)
    def propose( self, value, pID, instanceID ):
        self.proposalID = pID
        self.value = value
        self.instanceID = instanceID
        message = Message( Message.MSG_PROPOSE )
        message.proposalID = pID
        message.instanceID = instanceID
        message.value = value 
        for server in self.leader.getAcceptors( ):
            message.to = server
            self.leader.sendMessage( message )
        self.state = PaxosLeaderProtocol.STATE_PROPOSED
        return self.proposalID

Leaderが実際に値を過半数のacceptorに提案する。外部クライアントから値の提案の要請をrecvMessageが受けてPaxosLeader#newProposalを経由してこれを呼び出す。各種の値はそちらで決定しており引数で渡される。

  • 引数を自分のインスタンス変数に保存する。
  • 提案用のメッセージ(MSG_PROPOSE)を作成
  • 全てのacceptorに対し
    • メッセージをオーナであるリーダのsendMessageにてUDP送信する
  • 状態機械の状態をSTATE_PROPOSED(提案済み)に更新
  • 提案順序番号を返す
PaxosLeaderProtocol#doTransition(Message)
    def doTransition( self, message ):
        """プロトコルをシンプルな状態機械として実行する。
        期待しない入力に対してエラーとするのは常に正しくはない
        しかしメッセージの遅延の理由で、静かに入力を無視する"""
        if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
            if message.command == Message.MSG_ACCEPTOR_AGREE:
                self.agreecount += 1
                if self.agreecount >= self.leader.getQuorumSize( ):
#                    print "合意の定足数に逹っした。最後に返信された値:", message.value
                    if message.value != None:
                        # 値がNoneでなく順序番号が最も大きければ更新
                        if message.sequence[0] > self.highestseen[0] or (message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[1]):
                            self.value = message.value
                            self.highestseen = message.sequence
                    self.state = PaxosLeaderProtocol.STATE_AGREED
                    # 'accept'メッセージをグループに送る                   
                    msg = Message( Message.MSG_ACCEPT )
                    msg.copyAsReply( message )
                    msg.value = self.value
                    msg.leaderID = msg.to
                    for s in self.leader.getAcceptors( ):
                        msg.to = s
                        self.leader.sendMessage( msg )
                    self.leader.notifyLeader( self, message )
                return True
            if message.command == Message.MSG_ACCEPTOR_REJECT:
                self.rejectcount += 1
                if self.rejectcount >= self.leader.getQuorumSize( ):
                    self.state = PaxosLeaderProtocol.STATE_REJECTED                    
                    self.leader.notifyLeader( self, message )
                return True
        if self.state == PaxosLeaderProtocol.STATE_AGREED:
            if message.command == Message.MSG_ACCEPTOR_ACCEPT:
                self.acceptcount += 1
                if self.acceptcount >= self.leader.getQuorumSize( ):
                    self.state = PaxosLeaderProtocol.STATE_ACCEPTED
                    self.leader.notifyLeader( self, message )
            if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
                self.unacceptcount += 1
                if self.unacceptcount >= self.leader.getQuorumSize( ):
                    self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
                    self.leader.notifyLeader( self, message )
        pass

状態機械を遷移させるメソッド。
自分の現在状態値と受け取ったメッセージの組で実行を選択し、状態を更新する。
先にproposeメソッドが呼ばれているのてここに最初に来る場合、状態はSTATE_PROPOSEDになっている。
ここでは主にacceptorからのメッセージを処理する。個々のacceptorから別々にメッセージが届くのでこの中でメッセージのカウントを行い過半数判定を行っている。
基本的にこのロジックに合わない状態とメッセージの組は最後のpassにて全部無視される。
ここでagreecountが過半数を越えた場合に、既知の順序番号よりメッセージの順序番号が大きい場合に、提案する値を(つまり他のリーダによる新しい提案の値に)更新することが肝である。

  1. 現在の状態がSTATE_PROPOSEDの場合
    1. メッセージがMSG_ACCEPTOR_AGREEの場合
      • agreecountを1増やす
      • agreecountがacceptorの過半数を越えた場合のみ以下を実行
        • メッセージの提案順序番号が既知の番号より大きい場合
          • 提案する値をメッセージの持つ値に更新する
          • 既知の最大順序番号を更新する
          • acceptリクエストのメッセージを作成する
          • 全てのacceptorに対し
            • acceptリクエストを送信する
          • オーナであるleaderに対しnotifyLeaderを呼出す
    2. メッセージがMSG_ACCEPTOR_REJECTの場合
      • rejectcountを1増やす
      • rejectcountが過半数を越える場合
        • 状態遷移機械の状態をSTATE_REJECTEDに更新し、
        • オーナであるleaderのnotifyLeaderを呼出す
  2. 現在の状態がSTATE_AGREEDの場合
    1. MSG_ACCEPTOR_ACCEPTの場合
      • acceptcountを1増やす
      • acceptcountが過半数を越えたら
        • 状態をSTATE_ACCEPTEDに更新し、
        • オーナであるleaderのnotifyLeaderを呼出す
    2. MSG_ACCEPTOR_UNACCEPTの場合
      • unacceptcountを1増やす
      • unacceptcountが過半数を越えたら
        • 状態をSTATE_UNACCEPTEDに更新し、
        • オーナであるleaderのnotifyLeaderを呼出す

PaxosAcceptor

class PaxosAcceptor( object ):            
    def __init__(self, port,leaders ):
        self.port = port
        self.leaders = leaders
        self.instances = {}
        self.msgPump = MessagePump( self, self.port )
        self.failed = False

    def start( self ):
        self.msgPump.start( )

    def stop( self ):
        self.msgPump.doAbort( )

    def fail( self ):
        self.failed = True
    def recover( self ):
        self.failed = False

    def sendMessage( self, message ):
        self.msgPump.sendMessage( message )
コンストラク
MessagePumpに自身の参照とポート番号を渡して作成している
sendMessage
自身が持つMessagePumpインスタンスに委譲するのみ
recvMessage(self, message)

このメソッドはMessagePumpのインスタンスがメッセージをleaderより受け取った時にそのメッセージを引数に呼出される。

  1. messageがNoneなら何もせずに終了
  2. フラッグfailedがTrueなら何もせず終了。failedはsetFailedメソッドで設定され、acceptor上に障害が発生していることをシミュレートする。障害発生中はメッセージを受け取れず、記憶もできない
  3. messageのcommandがMSG_PROPOSEの場合。これはleaderから
PaxosAcceptor#recvMessage(メッセージ)
    def recvMessage( self, message ):
        if message == None: return
        if self.failed:
            return # Failure means ignored and lost messages
        if message.command == Message.MSG_PROPOSE: 
            if message.instanceID not in self.instances:
                record = InstanceRecord( )                
                self.instances[ message.instanceID ] = record
            protocol = PaxosAcceptorProtocol( self )
            protocol.recvProposal( message )
            self.instances[ message.instanceID ].addProtocol( protocol )
        else:
            self.instances[ message.instanceID ].getProtocol( message.proposalID ).doTransition( message )
PaxosAcceptor#notifyClient(protocol, message)
    def notifyClient( self, protocol, message ):
        if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED:
            self.instances[ protocol.instanceID ].value = message.value
#            print "Proposal accepted at client: ", message.value


    def getHighestAgreedProposal( self, instance ):
        return self.instances[ instance ].highestID

    def getInstanceValue( self, instance ):
        return self.instances[ instance ].value

PaxosAcceptorProtocol

Acceptorのプロトコルを実行する状態機械。
Acceptorのプロトコルが開始される場合に常に新しいインスタンスが1つ作成される。

コンストラクタ(クライアント)
class PaxosAcceptorProtocol( object ):
    # State variables
    STATE_UNDEFINED = -1
    STATE_PROPOSAL_RECEIVED = 0
    STATE_PROPOSAL_REJECTED = 1
    STATE_PROPOSAL_AGREED = 2
    STATE_PROPOSAL_ACCEPTED = 3
    STATE_PROPOSAL_UNACCEPTED = 4

    def __init__( self, client ):
        self.client = client
        self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

引数clientはAcceptorであるので注意。
状態をSTATE_UNDEFINEDで初期化。

PaxosAcceptorProtocol#recvProposal(message)

新しい提案を受け取った時に呼ばれる。
状態機械が新規に作成された直後。
Paxos合意アルゴリズムインスタンス1つにつきPaxosAcceptorProtocolは複数のインスタンスを作り得る。

    def recvProposal( self, message ):
        if message.command == Message.MSG_PROPOSE:
            self.proposalID = message.proposalID
            self.instanceID = message.instanceID
            # このインスタンスの最大の既に同意した提案は?
            (port, count) = self.client.getHighestAgreedProposal( message.instanceID )
            # この提案が最大の番号かチェックする
            if count < self.proposalID[0] or (count == self.proposalID[0] and port < self.proposalID[1]):
                # 同意したメッセージを返信する。もし存在するなら最大番号の承認済みの値を用いる
                self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED
#                print "Agreeing to proposal: ", message.instanceID, message.value
                value = self.client.getInstanceValue( message.instanceID )
                msg = Message( Message.MSG_ACCEPTOR_AGREE )
                msg.copyAsReply( message )
                msg.value = value
                msg.sequence = (port, count)
                self.client.sendMessage( msg )
            else:
                # 遅すぎ。既に他のに約束してしまった。
                # リジェクトメッセージを送る。最大番号とその値と一緒に
                self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
            return self.proposalID
        else:
            # エラー。提案じゃないものを受け取ってる?
            pass
  • メッセージから順序番号とインスタンスIDを自身にコピー
  • オーナであるPaxosAcceptorからインスタンスIDに対応する既に同意した最も大きな順序番号を得る。タプルからポート番号とプロトコルのカウント値(id)を得る。順序番号はPaxosLeader#newProposalにて(リーダのポート番号, PaxosLeaderProtocolのカウント値)にて作成されている。
  • 既知の最大番号と今回の番号を比較する。恐らくバグっていてポート番号とカウントを比較している。ポート番号は非常に大きいのでだいたい成功する。コメント欄にて既に通知されているが修正されなかったみたい。
  • 状態をSTATE_PROPOSAL_AGREEDに更新
  • MSG_ACCEPTOR_AGREEでメッセージを作成し、Leaderに返す。copyAsReplyで宛先と差し出しが引っくり返されている。
PaxosAcceptorProtocol#doTransition(メッセージ)
    def doTransition( self, message ):
        if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
            self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED
            # Could check on the value here, if we don't trust leaders to honour what we tell them
            # send reply to leader acknowledging
            msg = Message( Message.MSG_ACCEPTOR_ACCEPT )
            msg.copyAsReply( message )
            for l in self.client.leaders:
                msg.to = l
                self.client.sendMessage( msg )
            self.notifyClient( message )            
            return True
        
        raise Exception( "Unexpected state / command combination!" )

    def notifyClient( self, message ):
        self.client.notifyClient( self, message )

リーダに比べると比較的単純

  1. 状態がSTATE_PROPOSAL_AGREEDであれば
    • 状態をSTATE_PROPOSAL_ACCEPTEDに更新
    • MSG_ACCEPTOR_ACCEPTのメッセージを作成
    • 全てのLeaderに送信する
    • オーナであるAcceptorに終了をnotifyClientで通知
  2. else
    • 例外を投げる (ここに来るのはAGREEDの時だけ)

Message

class Message( object ):
    MSG_ACCEPTOR_AGREE = 0
    MSG_ACCEPTOR_ACCEPT = 1
    MSG_ACCEPTOR_REJECT = 2
    MSG_ACCEPTOR_UNACCEPT = 3
    MSG_ACCEPT = 4
    MSG_PROPOSE = 5
    MSG_EXT_PROPOSE = 6
    MSG_HEARTBEAT = 7
    def __init__( self, command = None ):
        self.command = command

    def copyAsReply( self, message ):
        self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
        self.value = message.value

Messageクラスはメッセージの種別をenumのように定数で定義している。
コンストラクタはメッセージの種別を引数として受け取る。

メソッドcopyAsReplyは自身のインスタンス変数として提案のID、インスタンスID、宛先、発信元、値を保存する。
注意:copyAsReplyはメッセージの送信元と送信先を引っくり返す。宛先を上書きしている場合はそうでないが、そうしていない場合、新しいメッセージは元の送り主に送られる。

class MessagePump( threading.Thread ):
    """メッセージポンプはソケット接続をカプセル化する。メッセージをオーナに渡す責任を持つ"""
    class MPHelper( threading.Thread ):
        """このヘルパークラスの理由はソケットからできるだけ速く中身を取り出すこと。バッファがいっぱいになるのを防ぐため。
        今さらだけどTCPを使ったほうが簡単だったと思う。:)"""
        def __init__( self, owner ):
            self.owner = owner
            threading.Thread.__init__( self )
        def run( self ):
            while not self.owner.abort:
                try:
                    (bytes, addr) = self.owner.socket.recvfrom( 2048 )
                    msg = pickle.loads( bytes )
                    msg.source = addr[1]
                    self.owner.queue.put( msg )
                except:
                    pass
    
    def __init__( self, owner, port, timeout=2 ):
        self.owner = owner
        threading.Thread.__init__( self )
        self.abort = False
        self.timeout = 2
        self.port = port
        self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
        self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_RCVBUF, 200000 )        
        self.socket.bind( ("localhost", port) )
        self.socket.settimeout( timeout )
        self.queue = Queue.Queue( )
        self.helper = MessagePump.MPHelper( self )
        
    def run( self ):
        self.helper.start( )
        while not self.abort:
            message = self.waitForMessage( )
            # これはブロッキングを必要とします。
            # でなければマルチスレッドの痛みを知るでしょう            
            self.owner.recvMessage( message )

    def waitForMessage( self ):
        try:
            msg = self.queue.get( True, 3 )
            return msg
        except: # ugh, specialise the exception!
            return None

    def sendMessage( self, message ):
        bytes = pickle.dumps( message )
        address = ("localhost", message.to)
        self.socket.sendto( bytes, address )
        return True
    
    def doAbort( self ):
        self.abort = True

MessagePumpはThreadを継承する。このプログラムではleaderもacceptorも全てこのメッセージポンプを利用してメッセージを汲み上げる。

  • 内部クラスMPHelperは別スレッドでひたすら受信したUDPメッセージをオーナのキューに突っ込む
MessagePump#コンストラクタ(オーナ、ポート番号、タイムアウト)

初期設定。スレッドがstart()にて開始されると以下実行はrun()になる。
ソケットまわりの初期設定とキューの作成、MPHelperの実行。

MessagePump#run()
  • MPHelperをstart()
  • abortしない限りwaitForMessageを実行しオーナのrecvMessageへ投げ続ける。
MessagePump#waitForMessage()
  • キューからメッセージを取り出す。この時キューにメッセージが存在しなければタイムアウト3秒まで待つ
  • タイムアウトした場合、Noneを返す

ここで大切なのが、メッセージ送信先が障害でメッセージを投げなくなった場合にタイムアウトによりオーナにはNoneがメッセージとして伝わること。
障害検知になっている。

終わりに

このプログラムは非常に優秀でPaxosの挙動を障害を発生させることにより確認することが可能である。
メインスレッドにあるAcceptor#failの呼出の数を変えれば、障害がAcceptorの過半数を越えた時に合意が取れないことが実際に確認できる。
また単一のLeaderが選出されないようsetPrimary(false)を取り除けば、これもまた合意が取れなくなる。


非同期環境におけるメッセージの遅延をシミュレートするためにAdversarialMessagePumpも付いている。これは通常は使用されないが、MessagePumpをこれで置き換えるとメッセージの遅延が発生する。


このプログラムに実際に手を入れ、様々な変更を施すことによりPaxosの理解が深まるだろう。
素晴しい記事とプログラムを提供してくれたHenry氏に感謝したい。