-
Notifications
You must be signed in to change notification settings - Fork 1
Open
@xirc
Description
Situation
- A leader received a
Replicate
message from an entity and will be replicating an entry of the message for the entity.- The leader register this replication (
ClientContext
withLogEntryIndex
) intoLeaderData.clients
.
- The leader register this replication (
- The leader becomes a follower for some reason before it completes the replication.
- There is another leader at this point.
- The new leader completes the replication.
- The old leader (a follower now) sends
Replica
to the entity.- The entry (
ClientContext
withLogEntryIndex
) ofLeaderData.clients
is not removed.
- The entry (
Related source code
akka-entity-replication/src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Lines 240 to 265 in abe3af9
case FollowedLeaderCommit(leaderMember, leaderCommit) =>currentData.detectLeaderMember(leaderMember).followLeaderCommit(leaderCommit).applyCommittedLogEntries { logEntries =>logEntries.foreach { logEntry =>applyToReplicationActor(logEntry)}}case Committed(logEntryIndex) =>currentData.commit(logEntryIndex).handleCommittedLogEntriesAndClients { entries =>entries.foreach {case (logEntry, Some(client)) =>if (log.isDebugEnabled)log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)client.ref.tell(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId),client.originSender.getOrElse(ActorRef.noSender),)case (logEntry, None) =>// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commitapplyToReplicationActor(logEntry)}}akka-entity-replication/src/main/scala/lerna/akka/entityreplication/raft/RaftMemberData.scala
Lines 226 to 231 in abe3af9
def handleCommittedLogEntriesAndClients(handler: Seq[(LogEntry, Option[ClientContext])] => Unit): RaftMemberData = {val applicableLogEntries = selectApplicableLogEntrieshandler(applicableLogEntries.map(e => (e, clients.get(e.index))))updateVolatileState(lastApplied = applicableLogEntries.lastOption.map(_.index).getOrElse(lastApplied)).updateLeaderVolatileState(clients = clients -- applicableLogEntries.map(_.index)) // 通知したクライアントは削除してメモリを節約}
Metadata
Metadata
Assignees
Labels
No labels