toy_paxos.pyの解説
前回と同じくcloudera社に勤務するHenryさんのPaxosの優しい解説。
今回の元記事は以下。なお、今回は全訳はなし。
http://the-paper-trail.org/blog/?p=190
今回はPythonで実装したtoy_paxosを解説する。
toy(おもちゃ)の由来は恐らく、氏がドク論執筆の合間に数時間で実装したもののため謙遜していると思われる。自分としては正直、よくこんな複雑なプログラムを数時間でかけるものだと感心する。
マルチスレッドではあるがマルチプロセスではない。単一PCの上で複数スレッドにて合意を形成する。Henryさんも書いているが、localhostがハードコードしてあるだけであり、マルチプロセスに書き直すのはそんなに難しくはないと思う。
氏が作成したプログラム"toy_paxos.py"は元記事の"A few hours later, I'd written this"のthisにあるリンクからダウンロードできる。
なお、ライセンスはGPLv2である。
プログラムの目的
このプログラムはLamportの"Paxos made simple"の最後に記述された分散環境上での状態機械の実装を行っている。
クライアントはクラスタの"leader"に対し命令の実行を依頼する。
leaderは受け取った命令をacceptorのグループに提案し、過半数のacceptorにより承認された場合、learner(leaderと兼任)に送られ実行(このプログラムでは記録のみ)される。
このプログラムはメインスレッドがクライアントを担当し、複数個の値(命令)をメッセージとしてLeaderに送信する。LeaderはPaxos合意プロトコルのインスタンスを開始し、Acceptorのグループに値を提案し合意(過半数からの値の承認)を得る。そしてLearner(Leader)はその値を記録する。
メインスレッドはLeaderを監視し、全ての値が実行(記録)された時点でLeaderとAcceptorを停止し、Learnerが記録した値のヒストリーを表示して終了する。
クラス一覧
- Message
- メッセージのenum値を持つメッセージ構造体
- MessagePump
- メッセージの送受信を実際に行うスレッド。leaderとacceptorの両方で使われる。
- InstanceRecord
- プロトコルのインスタンスをリストに保存する。leaderとacceptorの両方で使われる。
- PaxosLeader
- プロトコルのリーダ(proposer)の実装
- PaxosLeaderProtocol
- リーダ側のプロトコル進行を管理する状態機械
- PaxosAcceptor
- acceptorの実装
- PaxosAcceptorProtocol
- アクセプタ側のプロトコル進行を管理する状態機械
各クラスには内部クラスもあり、クラス数はこれより多い。
プログラムの構成
このプログラムではLeaderとAcceptorのインスタンスはそれぞれメッセージを送受信するためのMessagePump(メッセージポンプ)を持っている。MessagePumpは内部クラスにMPHelperを持ち、MessagePumpとMPHelperのそれぞれがThreadを継承し、両者が別スレッドにて実行される。LeaderとAcceptorを駆動するのはメッセージであり、MPHelperがメッセージを受信するとLeader及びAcceptorのrecvMessageを呼出す。そしてProtocol#doTransitionを呼出しプロトコルを回し新しいメッセージをメッセージポンプを利用して投げる。これによりお互いがメッセージを投げ合うことでプログラム全体が進行していく。
Paxos合意プロトコルの連続するインスタンスはLeaderとAcceptorのそれぞれで同じインスタンスIDにて管理される。Paxos合意プロトコルの個々のインスタンス1つにつき1つInstanceRecordのインスタンスをLeaderとAcceptorが持つ。またLeaderにはLeaderProtocol、AcceptorにはAcceptorProtocolのインスタンスが作成され、プロトコルIDにて管理される。PaxosLeaderとPaxosAcceptorは全てのInstanceRecordとProtocolのインスタンスを管理するマネージャ的存在であり、インスタンスは1つづつしか存在しない。LeaderとAcceptorのProtocolは必ず成功するとは限らないので合意プロトコル1つに対し、LeaderProtocol、AcceptorProtocolは複数存在し得る。
非常に紛らわしいので注意して欲しいが、このプログラムの目的が状態機械であると同時に、LeaderProtocolとAcceptorProtocolの実装は状態機械である。LeaderProtocolとAcceptorProtocolのインスタンスは個々に状態変数を持ち、状態遷移をdoTransitionメソッドにて実行する。
ハートビートと単一リーダの選出
このプログラムではLeaderが2個とAcceptorが5個存在する。
Leaderは2つとも起動時には自身がリーダだと考えている。(setPrimary(True))
Leaderは内部クラスにHeartbeatSenderとHeartbeatListenerを持っている。
そして自身がprimaryである限り、他のリーダ全てにハートビートを送る。
Leaderは自身がprimaryであると考えているのに他のハートビートを受け取る場合がある。この場合、相手のポート番号が自分のポート番号よりも大きい場合に相手にprimaryを譲り、自分はsetPrimary(False)する。
これにより単一のリーダが選出される。
逆にprimaryでないLeaderは常にハートビートを聞いており、ハートビートが途絶えると自らをsetPrimary(True)してprimaryとして行動を開始する。
現在、このプログラムではprimaryは1秒に1回、ハートビートを送信する。
primaryでないLeaderは2秒間ハートビートが途絶えた場合にprimaryとして行動開始する。
ギャップを埋める
リーダは約15秒に1回記憶のギャップを埋める。
"Paxos made simple"の最後にて解説されているように、複数のPaxosのインスタンスを順に実行すると、実行順にインスタンスが停止するとは限らない。
このため、Paxosで状態機械を実装すると命令が提案された順には合意されず、合意済みの命令と別の合意済み命令の間にまだ選択されていない(終了していない)命令の穴(ギャップ)が存在してしまう。このギャップを埋めなければ既に合意している命令も実行できない。ギャップを埋めるには新しい命令の提案を行っても良いが、"Paxos made simple"では"no-op"の命令を再提案する方法を紹介しており、このプログラムでも"no-op"に相当する0を未終了のインスタンスに対して再提案させている。結果的に既に動いている提案が遅れて選択されるか、再提案された"no-op"が選択される。これにより命令の実行を続けることが可能になる。このプログラムでは命令の実行は省力されているのでギャップを埋めても全体に影響は無い。
記憶
このプログラムはAcceptorが値を承認すると、primaryだけでなく全てのLeaderにACCEPTOR_ACCEPTのメッセージと共に値を送信する。これにより、primaryでないLeader(learner)も全ての値を記憶することができる。
"Paxos made simple"ではこのモデルを一番簡単な物として紹介しているが通信量が増える欠点があると指摘している。"distinguished learner"がleaderより値を受け取り、他のlearnerに配布する効率の良いモデルが記述されている。learner同士が通信を行う場合をcatchupと言う。
プログラムの流れ・序盤
メインスレッドが命令を1個メッセージとして投げた時、以下のようにメッセージがリーダに拾われ、リーダがメッセージをアクセプタに投げ、そして合意プロトコルが停止するまでメッセージの投げ合いが続く
- クライアントはleader2にMSG_EXT_PROPOSEを送る
- MessagePumpHelper <- leaderのが受け取る
- MessagePump
- PaxosLeader.recvMessage
- PaxosLeader.newProposal 提案の作成:prepareリクエスト
- PaxosLeaderProtocol.propose 提案の実行
- PaxosLeader.sendMessage
- MessagePump.sendMessage <- Leaderのhas-a
- MessagePumpHelper <- Acceptorのhas-a
- MessagePump
- PaxosAcceptor.recvMessage
- PaxosAcceptorProtocol.recvProposal AGREEを返す
- MessagePumpHelper <- Leader
- MessagePump
- PaxosLeader.recvMessage
- PaxosLeaderProtocl.doTransition
以下、停止するまで続く
状態遷移図
PaxosLeaderProtocol
STATE_UNDEFINED -> STATE_PROPOSED -> STATE_AGREED -> STATE_ACCEPTED └> STATE_REJECTED └> STATE_UNACCEPTED
PaxosAcceptorProtocol
STATE_UNDEFINED -> STATE_PROPOSAL_AGREED -> STATE_PROPOSAL_ACCEPTED └> STATE_PROPOSAL_REJECTED
以下の物は未使用
- STATE_PROPOSAL_RECIEVED
- STATE_PROPOSAL_UNACCEPTED
ソースコード解析
量が半端無いのと面倒になったのでgetterとかsetterとかはちょっとロジックが有っても解説無し。:-)
Java屋に微妙にわかりにくいのはLeaderとAcceptorの間には共通の親クラスもインターフェースもないのに、MessagePumpから同じ名前であるrecvMessageというメソッドが呼ばれる点。またインスタンス変数も動的に(JavaScriptみたく)追加できるので同じ変数のさす複数のインスタンスが別々の構造を持つことも可能だ。Pythonは柔軟な分、読むのが大変だなぁと言いたいだけ。(^^;
トップのコメント
# Naive implementation of the Paxos protocol. # Henry Robinson, 2009 # Licensed under GPL v2
main
import time if __name__ == '__main__': numclients = 5 clients = [ PaxosAcceptor( port, [54321,54322] ) for port in xrange( 64320, 64320+numclients ) ] leader = PaxosLeader( 54321, [54322], [c.port for c in clients] ) leader2 = PaxosLeader( 54322, [54321], [c.port for c in clients] ) leader.start( ) leader.setPrimary( True ) leader2.setPrimary( True ) leader2.start( ) for c in clients: c.start( ) clients[0].fail( ) clients[1].fail( ) # clients[2].fail( ) # いくらかの提案を発行する s = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) start = time.time( ) for i in xrange(100): m = Message( Message.MSG_EXT_PROPOSE ) m.value = 0 + i m.to = 54322 bytes = pickle.dumps( m ) s.sendto( bytes, ("localhost", m.to) ) while leader2.getNumAccepted( ) < 99: print "Sleeping for 1s -- accepted:", leader2.getNumAccepted( ) time.sleep( 1 ) end = time.time( ) # print "Sleeping for 10s" # time.sleep( 10 ) print "Stopping leaders" leader.stop( ) leader2.stop( ) print "Stopping clients" for c in clients: c.stop( ) print "Leader 1 history: ", leader.getHistory( ) print "Leader 2 history: ", leader2.getHistory( ) print end - start
- クライアント数を5に設定
- クライアント数の数だけPaxosAcceptorのインスタンスを作成してArrayにしている。コンストラクタの第1引数が自身のポート、第2引数は複数のリーダーのポート
- 一つ目のリーダーの作成。第1引数が自分のポート。第2引数が他のリーダーのポート。第3引数がクライアントのポート。次に同様に2つ目のリーダーの作成
- 第一、第二リーダーをプライマリに設定。後でポート番号により優先順位で調整される。両方のスレッドをスタート
- クライアント全てをスタート
- クライアントに擬似的に障害を発生させる。障害が過半数を越えるとうまく動かなくなるはず。
- UDPソケットを用意
- 現在時刻をstartに保存
- インデックスiにて100回ループ
- leader2にて合意済みの値が100個になるまで時間稼ぎ
- leaderを停止
- clientを停止
- leader1,2の履歴を表示。目視で比較
- 実行時間の表示(秒)
PaxosLeader
PaxosLeaderはleaderのクラス。MessagePumpを持つことでメッセージをクライアントとacceptorとの間でやり取りする。
また内部クラスとしてHeartbeatSenderとHeartbeatListenerを持ち他のleader候補とハートビートを交換し、生存確認を行っている。
コンストラクタ(ポート番号、他のleader、acceptor)
class PaxosLeader( object ): def __init__(self, port, leaders=None, acceptors=None): self.port = port if leaders == None: self.leaders = [] else: self.leaders = leaders if acceptors == None: self.acceptors = [] else: self.acceptors = acceptors self.group = self.leaders + self.acceptors self.isPrimary = False self.proposalCount = 0 self.msgPump = MessagePump( self, port ) self.instances = {} self.hbListener = PaxosLeader.HeartbeatListener( self ) self.hbSender = PaxosLeader.HeartbeatSender( self ) self.highestInstance = -1 self.stopped = True # The last time we tried to fix up any gaps self.lasttime = time.time( )
- 引数をインスタンス変数に代入
- インスタンス変数groupはleaderとacceptorの和集合。使用されていない。
- primaryフラグはデフォルトではfalse。後からsetPrimary
- MessagePumpに自身の参照とポート番号を渡してインスタンス化
- instancesにPaxos合意アルゴリズムのインスタンスの履歴を入れる。ここで空で初期化。
- HeartbeatListenerとHeartbeatSenderを自身の参照を引数にインスタンス化
- 既知のインスタンス番号で最も大きい値を-1で初期化
- 停止フラグ:デフォルトがTrue
- lastime:最後にギャップを埋めた時間。現在値で初期化。
内部クラス:HeartbeatListner
#------------------------------------------------------ # These two classes listen for heartbeats from other leaders # and, if none appear, tell this leader that it should # be the primary class HeartbeatListener( threading.Thread ): def __init__( self, leader ): self.leader = leader self.queue = Queue.Queue( ) self.abort = False threading.Thread.__init__( self ) def newHB( self, message ): self.queue.put( message ) def doAbort( self ): self.abort = True def run( self ): elapsed = 0 while not self.abort: s = time.time( ) try: hb = self.queue.get( True, 2 ) # Easy way to settle conflicts - if your port number is bigger than mine, # you get to be the leader if hb.source > self.leader.port: self.leader.setPrimary( False ) except: # Nothing was got self.leader.setPrimary( True )
HeartbeatListnerとHeartbeatSenderの2つが他のleaderとハートビートを交換する。他のleaderのハートビートが聞こえなくなった時(Noneが帰ったとき)このleaderがprimaryとして立候補する。
HeartbeatListnerはThreadを継承しており、ずっとrun()が実行される。
実際にハートビートを聞くのはMessagePumpであり、メッセージの種類からハートビートだと確認される。
内部クラスHeartbeatSender
class HeartbeatSender( threading.Thread ): def __init__( self, leader ): self.leader = leader self.abort = False threading.Thread.__init__( self ) def doAbort( self ): self.abort = True def run( self ): while not self.abort: time.sleep( 1 ) if self.leader.isPrimary: msg = Message( Message.MSG_HEARTBEAT ) msg.source = self.leader.port for l in self.leader.leaders: msg.to = l self.leader.sendMessage( msg )
HeartbeatSenderもThreadを継承している。オーナであるリーダがprimaryである場合のみ、1秒に1回他のleaderに対しハートビートを送る。
- コンストラクタ
- 他のリーダーのリストを受け取り保存。スレッドを実行。以降、run()が実行される
- run()
- 本体
- abortされない間ループ
- 1秒間寝るよ
- オーナであるleaderがprimaryなら実行
- MSG_HEARTBEATのメッセージを作成
- 他のリーダー全てに対し、
- 自身のオーナであるリーダからメッセージを送信する
PaxosLeader#sendMessage()
#------------------------------------------------------ def sendMessage( self, message ): self.msgPump.sendMessage( message )
MessagePumpのsendMessageに委譲するのみ
PaxosLeader#recvMessage(メッセージ)
def recvMessage( self, message ): """メッセージポンプがこれを定期的に呼出す。メッセージが無い場合にも同様""" if self.stopped: return if message == None: # 15秒毎のみ実行。でないと実行中の良いプロトコルも切ってしまう:( if self.isPrimary and time.time( ) - self.lasttime > 15.0: self.findAndFillGaps( ) self.garbageCollect( ) return if message.command == Message.MSG_HEARTBEAT: self.hbListener.newHB( message ) return True if message.command == Message.MSG_EXT_PROPOSE: print "External proposal received at", self.port, self.highestInstance if self.isPrimary: self.newProposal( message.value ) # primeでなければ無視する。primaryでなくともproposalは受け取る場合有り # 親切にするならleaderが代わったと返信するだろう # そして新しいleaderのアドレスを教える。しかしそうしたって失敗する可能性はある return True if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT: self.instances[ message.instanceID ].getProtocol(message.proposalID).doTransition( message ) # Acceptorは値を承認するとプライマリでないリーダ全てにも # 同じメッセージを流すようにプログラムされている。 # その場合、InstanceRecordもProtocolも無い場合が有る。 if message.command == Message.MSG_ACCEPTOR_ACCEPT: if message.instanceID not in self.instances: self.instances[ message.instanceID ] = InstanceRecord( ) record = self.instances[ message.instanceID ] if message.proposalID not in record.protocols: protocol = PaxosLeaderProtocol( self ) # 状態をAccept待ちにする protocol.state = PaxosLeaderProtocol.STATE_AGREED protocol.proposalID = message.proposalID protocol.instanceID = message.instanceID protocol.value = message.value record.addProtocol( protocol ) else: protocol = record.getProtocol( message.proposalID ) # ここで初期化した場合も次に落ちる protocol.doTransition( message ) return True
recvMessageはMessagePumpがメッセージを受け取った場合、またはタイムアウトした場合に呼出される。
Leaderは外部クライアントとacceptorと他のリーダ全てからメッセージをここで受け取るのでロジックが長い。Pythonにはswitch-case文が無いのでこのような形に。else文ほとんどを使っていないのでreturnが無い場合、意図して続行される点に注意。
- メッセージが無い場合
- 15秒に一回、ギャップを埋めてGCを実行し終了
- MSG_HEARTBEATの場合
- メッセージをHeartbeatListnerのキューに叩き込んで終了
- MSG_EXT_PROPOSE(クライアントからの提案)の場合
- 自分がprimaryならnewPropossal(message.value)を呼んで終了
- 自分がprimaryでないならただ終了
- 自分がprimaryでかつ、MSG_ACCEPTOR_ACCEPTではない場合
- InstanceRecordからLeaderProtocolを取り出し、doTransitionを実行することで状態機械を進める。次のif文は絶対に真にならないのでここでreturnすると良いと思った
- メッセージがMSG_ACCEPTORの場合
PaxosLeader#newProposal(値, インスタンスID)
def newProposal( self, value, instance = None ): protocol = PaxosLeaderProtocol( self ) if instance == None: self.highestInstance += 1 instanceID = self.highestInstance else: instanceID = instance self.proposalCount += 1 id = (self.port, self.proposalCount ) if instanceID in self.instances: record = self.instances[ instanceID ] else: record = InstanceRecord( ) self.instances[ instanceID ] = record protocol.propose( value, id, instanceID ) record.addProtocol( protocol )
PaxosLeaderがクライアントからproposalを受け取った時に呼出され、Paxos合意プロトコルのインスタンスを作成し、プロトコルを開始する
- PaxosLeaderProtocolのコンストラクタを自身の参照を渡して呼出し
- インスタンスIDが指定されていない場合
- インスタンスIDが指定されている場合
- インスタンスIDに指定値を代入
- 提案カウンタを+1
- Paxos合意プロトコルの順序番号を作成。このプログラムではleader固有のポート番号と提案番号
- インスタンスIDが既知ならばInstanceRecordをインスタンス辞書から取り出す。そうでなければInstanceRecordをインスタンスIDから新規に作成し、インスタンス辞書にも登録
- PaxosLeaderProtocolのproposeに委譲。値と順序番号とインスタンスIDを引数として渡す
- InstanceRecordにprotocolの参照を引渡し登録
PaxosLeader#notifyLeader(protocol, message)
def notifyLeader( self, protocol, message ): # Protocols call this when they're done if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED: print "Protocol instance %s accepted with value %s" % (message.instanceID, message.value) self.instances[ message.instanceID ].accepted = True self.instances[ message.instanceID ].value = message.value self.highestInstance = max( message.instanceID, self.highestInstance ) return if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # メッセージから値を確認してリトライする # いずれアクセプター達は何らかの値をこのインスタンスに対し # 承認するだろう。そしてプロトコルは完了する。 self.proposalCount = max( self.proposalCount, message.highestPID[1] ) self.newProposal( message.value ) return True if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED: pass
このメソッドは状態遷移機械であるPaxosLeaderProtocolが停止した場合に呼ばれる。
PaxosLeader#findAndFillGaps
def findAndFillGaps( self ): # if no message is received, we take the chance to do a little cleanup for i in xrange(1,self.highestInstance): if self.getInstanceValue( i ) == None: print "Filling in gap", i self.newProposal( 0, i ) # This will either eventually commit an already accepted value, or fill in the gap with 0 or no-op self.lasttime = time.time( ) def garbageCollect( self ): for i in self.instances: self.instances[i].cleanProtocols( )
"Paxos made simple"にて解説されていたギャップの整理。
連続したPaxosのインスタンス列を実行するとPaxosのインスタンスの実行終了は実行順になるとは限らず選択された値(命令)列には穴(ギャップ)ができる。これは現実の実行環境は常に非同期的な特性を持つため。
findAndFillGapsとgarbageCollectは常に同時に約15秒に一回、リーダのrecvMessageより呼ばれる。
PaxosLeaderProtocol
Paxos合意プロトコルの状態機械。プロトコルインスタンスの状態をインスタンス変数stateとして保持する。
初期化時STATE_UNDEFINED
アクセプターへの提案を終えるとSTATE_PROPOSED
コンストラクタ
class PaxosLeaderProtocol( object ): # State variables STATE_UNDEFINED = -1 STATE_PROPOSED = 0 STATE_AGREED = 1 STATE_REJECTED = 2 STATE_ACCEPTED = 3 STATE_UNACCEPTED = 4 def __init__( self, leader ): self.leader = leader self.state = PaxosLeaderProtocol.STATE_UNDEFINED self.proposalID = (-1,-1) self.agreecount, self.acceptcount = (0,0) self.rejectcount, self.unacceptcount = (0,0) self.instanceID = -1 self.highestseen = (0,0)
- リーダの参照をもらい相互参照にしておく。
- その他、初期設定
PaxosLeaderProtocol#propose(値, 提案順序番号, インスタンスID)
def propose( self, value, pID, instanceID ): self.proposalID = pID self.value = value self.instanceID = instanceID message = Message( Message.MSG_PROPOSE ) message.proposalID = pID message.instanceID = instanceID message.value = value for server in self.leader.getAcceptors( ): message.to = server self.leader.sendMessage( message ) self.state = PaxosLeaderProtocol.STATE_PROPOSED return self.proposalID
Leaderが実際に値を過半数のacceptorに提案する。外部クライアントから値の提案の要請をrecvMessageが受けてPaxosLeader#newProposalを経由してこれを呼び出す。各種の値はそちらで決定しており引数で渡される。
PaxosLeaderProtocol#doTransition(Message)
def doTransition( self, message ): """プロトコルをシンプルな状態機械として実行する。 期待しない入力に対してエラーとするのは常に正しくはない しかしメッセージの遅延の理由で、静かに入力を無視する""" if self.state == PaxosLeaderProtocol.STATE_PROPOSED: if message.command == Message.MSG_ACCEPTOR_AGREE: self.agreecount += 1 if self.agreecount >= self.leader.getQuorumSize( ): # print "合意の定足数に逹っした。最後に返信された値:", message.value if message.value != None: # 値がNoneでなく順序番号が最も大きければ更新 if message.sequence[0] > self.highestseen[0] or (message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[1]): self.value = message.value self.highestseen = message.sequence self.state = PaxosLeaderProtocol.STATE_AGREED # 'accept'メッセージをグループに送る msg = Message( Message.MSG_ACCEPT ) msg.copyAsReply( message ) msg.value = self.value msg.leaderID = msg.to for s in self.leader.getAcceptors( ): msg.to = s self.leader.sendMessage( msg ) self.leader.notifyLeader( self, message ) return True if message.command == Message.MSG_ACCEPTOR_REJECT: self.rejectcount += 1 if self.rejectcount >= self.leader.getQuorumSize( ): self.state = PaxosLeaderProtocol.STATE_REJECTED self.leader.notifyLeader( self, message ) return True if self.state == PaxosLeaderProtocol.STATE_AGREED: if message.command == Message.MSG_ACCEPTOR_ACCEPT: self.acceptcount += 1 if self.acceptcount >= self.leader.getQuorumSize( ): self.state = PaxosLeaderProtocol.STATE_ACCEPTED self.leader.notifyLeader( self, message ) if message.command == Message.MSG_ACCEPTOR_UNACCEPT: self.unacceptcount += 1 if self.unacceptcount >= self.leader.getQuorumSize( ): self.state = PaxosLeaderProtocol.STATE_UNACCEPTED self.leader.notifyLeader( self, message ) pass
状態機械を遷移させるメソッド。
自分の現在状態値と受け取ったメッセージの組で実行を選択し、状態を更新する。
先にproposeメソッドが呼ばれているのてここに最初に来る場合、状態はSTATE_PROPOSEDになっている。
ここでは主にacceptorからのメッセージを処理する。個々のacceptorから別々にメッセージが届くのでこの中でメッセージのカウントを行い過半数判定を行っている。
基本的にこのロジックに合わない状態とメッセージの組は最後のpassにて全部無視される。
ここでagreecountが過半数を越えた場合に、既知の順序番号よりメッセージの順序番号が大きい場合に、提案する値を(つまり他のリーダによる新しい提案の値に)更新することが肝である。
PaxosAcceptor
class PaxosAcceptor( object ): def __init__(self, port,leaders ): self.port = port self.leaders = leaders self.instances = {} self.msgPump = MessagePump( self, self.port ) self.failed = False def start( self ): self.msgPump.start( ) def stop( self ): self.msgPump.doAbort( ) def fail( self ): self.failed = True def recover( self ): self.failed = False def sendMessage( self, message ): self.msgPump.sendMessage( message )
recvMessage(self, message)
このメソッドはMessagePumpのインスタンスがメッセージをleaderより受け取った時にそのメッセージを引数に呼出される。
- messageがNoneなら何もせずに終了
- フラッグfailedがTrueなら何もせず終了。failedはsetFailedメソッドで設定され、acceptor上に障害が発生していることをシミュレートする。障害発生中はメッセージを受け取れず、記憶もできない
- messageのcommandがMSG_PROPOSEの場合。これはleaderから
PaxosAcceptor#recvMessage(メッセージ)
def recvMessage( self, message ): if message == None: return if self.failed: return # Failure means ignored and lost messages if message.command == Message.MSG_PROPOSE: if message.instanceID not in self.instances: record = InstanceRecord( ) self.instances[ message.instanceID ] = record protocol = PaxosAcceptorProtocol( self ) protocol.recvProposal( message ) self.instances[ message.instanceID ].addProtocol( protocol ) else: self.instances[ message.instanceID ].getProtocol( message.proposalID ).doTransition( message )
PaxosAcceptor#notifyClient(protocol, message)
def notifyClient( self, protocol, message ): if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: self.instances[ protocol.instanceID ].value = message.value # print "Proposal accepted at client: ", message.value def getHighestAgreedProposal( self, instance ): return self.instances[ instance ].highestID def getInstanceValue( self, instance ): return self.instances[ instance ].value
PaxosAcceptorProtocol
Acceptorのプロトコルを実行する状態機械。
Acceptorのプロトコルが開始される場合に常に新しいインスタンスが1つ作成される。
コンストラクタ(クライアント)
class PaxosAcceptorProtocol( object ): # State variables STATE_UNDEFINED = -1 STATE_PROPOSAL_RECEIVED = 0 STATE_PROPOSAL_REJECTED = 1 STATE_PROPOSAL_AGREED = 2 STATE_PROPOSAL_ACCEPTED = 3 STATE_PROPOSAL_UNACCEPTED = 4 def __init__( self, client ): self.client = client self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
引数clientはAcceptorであるので注意。
状態をSTATE_UNDEFINEDで初期化。
PaxosAcceptorProtocol#recvProposal(message)
新しい提案を受け取った時に呼ばれる。
状態機械が新規に作成された直後。
Paxos合意アルゴリズムのインスタンス1つにつきPaxosAcceptorProtocolは複数のインスタンスを作り得る。
def recvProposal( self, message ): if message.command == Message.MSG_PROPOSE: self.proposalID = message.proposalID self.instanceID = message.instanceID # このインスタンスの最大の既に同意した提案は? (port, count) = self.client.getHighestAgreedProposal( message.instanceID ) # この提案が最大の番号かチェックする if count < self.proposalID[0] or (count == self.proposalID[0] and port < self.proposalID[1]): # 同意したメッセージを返信する。もし存在するなら最大番号の承認済みの値を用いる self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # print "Agreeing to proposal: ", message.instanceID, message.value value = self.client.getInstanceValue( message.instanceID ) msg = Message( Message.MSG_ACCEPTOR_AGREE ) msg.copyAsReply( message ) msg.value = value msg.sequence = (port, count) self.client.sendMessage( msg ) else: # 遅すぎ。既に他のに約束してしまった。 # リジェクトメッセージを送る。最大番号とその値と一緒に self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED return self.proposalID else: # エラー。提案じゃないものを受け取ってる? pass
- メッセージから順序番号とインスタンスIDを自身にコピー
- オーナであるPaxosAcceptorからインスタンスIDに対応する既に同意した最も大きな順序番号を得る。タプルからポート番号とプロトコルのカウント値(id)を得る。順序番号はPaxosLeader#newProposalにて(リーダのポート番号, PaxosLeaderProtocolのカウント値)にて作成されている。
- 既知の最大番号と今回の番号を比較する。恐らくバグっていてポート番号とカウントを比較している。ポート番号は非常に大きいのでだいたい成功する。コメント欄にて既に通知されているが修正されなかったみたい。
- 状態をSTATE_PROPOSAL_AGREEDに更新
- MSG_ACCEPTOR_AGREEでメッセージを作成し、Leaderに返す。copyAsReplyで宛先と差し出しが引っくり返されている。
PaxosAcceptorProtocol#doTransition(メッセージ)
def doTransition( self, message ): if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT: self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # Could check on the value here, if we don't trust leaders to honour what we tell them # send reply to leader acknowledging msg = Message( Message.MSG_ACCEPTOR_ACCEPT ) msg.copyAsReply( message ) for l in self.client.leaders: msg.to = l self.client.sendMessage( msg ) self.notifyClient( message ) return True raise Exception( "Unexpected state / command combination!" ) def notifyClient( self, message ): self.client.notifyClient( self, message )
リーダに比べると比較的単純
- 状態がSTATE_PROPOSAL_AGREEDであれば
- 状態をSTATE_PROPOSAL_ACCEPTEDに更新
- MSG_ACCEPTOR_ACCEPTのメッセージを作成
- 全てのLeaderに送信する
- オーナであるAcceptorに終了をnotifyClientで通知
- else
- 例外を投げる (ここに来るのはAGREEDの時だけ)
Message
class Message( object ): MSG_ACCEPTOR_AGREE = 0 MSG_ACCEPTOR_ACCEPT = 1 MSG_ACCEPTOR_REJECT = 2 MSG_ACCEPTOR_UNACCEPT = 3 MSG_ACCEPT = 4 MSG_PROPOSE = 5 MSG_EXT_PROPOSE = 6 MSG_HEARTBEAT = 7 def __init__( self, command = None ): self.command = command def copyAsReply( self, message ): self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to self.value = message.value
Messageクラスはメッセージの種別をenumのように定数で定義している。
コンストラクタはメッセージの種別を引数として受け取る。
メソッドcopyAsReplyは自身のインスタンス変数として提案のID、インスタンスID、宛先、発信元、値を保存する。
注意:copyAsReplyはメッセージの送信元と送信先を引っくり返す。宛先を上書きしている場合はそうでないが、そうしていない場合、新しいメッセージは元の送り主に送られる。
class MessagePump( threading.Thread ): """メッセージポンプはソケット接続をカプセル化する。メッセージをオーナに渡す責任を持つ""" class MPHelper( threading.Thread ): """このヘルパークラスの理由はソケットからできるだけ速く中身を取り出すこと。バッファがいっぱいになるのを防ぐため。 今さらだけどTCPを使ったほうが簡単だったと思う。:)""" def __init__( self, owner ): self.owner = owner threading.Thread.__init__( self ) def run( self ): while not self.owner.abort: try: (bytes, addr) = self.owner.socket.recvfrom( 2048 ) msg = pickle.loads( bytes ) msg.source = addr[1] self.owner.queue.put( msg ) except: pass def __init__( self, owner, port, timeout=2 ): self.owner = owner threading.Thread.__init__( self ) self.abort = False self.timeout = 2 self.port = port self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_RCVBUF, 200000 ) self.socket.bind( ("localhost", port) ) self.socket.settimeout( timeout ) self.queue = Queue.Queue( ) self.helper = MessagePump.MPHelper( self ) def run( self ): self.helper.start( ) while not self.abort: message = self.waitForMessage( ) # これはブロッキングを必要とします。 # でなければマルチスレッドの痛みを知るでしょう self.owner.recvMessage( message ) def waitForMessage( self ): try: msg = self.queue.get( True, 3 ) return msg except: # ugh, specialise the exception! return None def sendMessage( self, message ): bytes = pickle.dumps( message ) address = ("localhost", message.to) self.socket.sendto( bytes, address ) return True def doAbort( self ): self.abort = True
MessagePumpはThreadを継承する。このプログラムではleaderもacceptorも全てこのメッセージポンプを利用してメッセージを汲み上げる。
- 内部クラスMPHelperは別スレッドでひたすら受信したUDPメッセージをオーナのキューに突っ込む
MessagePump#コンストラクタ(オーナ、ポート番号、タイムアウト)
初期設定。スレッドがstart()にて開始されると以下実行はrun()になる。
ソケットまわりの初期設定とキューの作成、MPHelperの実行。
MessagePump#run()
- MPHelperをstart()
- abortしない限りwaitForMessageを実行しオーナのrecvMessageへ投げ続ける。
終わりに
このプログラムは非常に優秀でPaxosの挙動を障害を発生させることにより確認することが可能である。
メインスレッドにあるAcceptor#failの呼出の数を変えれば、障害がAcceptorの過半数を越えた時に合意が取れないことが実際に確認できる。
また単一のLeaderが選出されないようsetPrimary(false)を取り除けば、これもまた合意が取れなくなる。
非同期環境におけるメッセージの遅延をシミュレートするためにAdversarialMessagePumpも付いている。これは通常は使用されないが、MessagePumpをこれで置き換えるとメッセージの遅延が発生する。
このプログラムに実際に手を入れ、様々な変更を施すことによりPaxosの理解が深まるだろう。
素晴しい記事とプログラムを提供してくれたHenry氏に感謝したい。