Skip to content

Commit 5aca222

Browse files
committed
refactor: replace has_payload with inflight_id presence check
Simplify replication progress tracking by removing the redundant `has_payload` boolean. The presence of `inflight_id` (Some vs None) now indicates whether a response requires inflight state update. `InflightId` identifies whether an AppendEntries RPC carries actual log data that needs acknowledgment. Not all RPCs have a corresponding inflight record on the leader. For example, when synchronizing the committed index, the leader sends an AppendEntries RPC without log payload - such RPCs have no `InflightId`. However, RPCs that replicate actual log entries have a corresponding inflight record on the leader, and these RPCs carry an `InflightId`. This distinction determines whether a response should update the inflight state. Changes: - Remove `has_payload` field from `Notification::ReplicationProgress` - Change `Inflight::ack()` and `Inflight::conflict()` to require matching `InflightId` - Restructure `Data::Logs` to include `inflight_id` field - Move `inflight_id` tracking into `ReplicationCore` - Update `update_conflicting()` and `update_progress()` signatures
1 parent a774c31 commit 5aca222

File tree

15 files changed

+152
-171
lines changed

15 files changed

+152
-171
lines changed

openraft/src/core/heartbeat/worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ where
135135
let conflict_log_id = heartbeat.matching.clone().unwrap();
136136

137137
let noti = Notification::ReplicationProgress {
138-
has_payload: false,
139138
progress: Progress {
140139
session_id: heartbeat.session_id.clone(),
141140
target: self.target.clone(),

openraft/src/core/notification.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,17 @@ where C: RaftTypeConfig
5353

5454
/// Result of executing a command sent from network worker.
5555
ReplicationProgress {
56-
/// If this progress from RPC with payload.
57-
///
58-
/// `has_payload`: contain payload and should reset `inflight` state if conflict.
59-
has_payload: bool,
6056
progress: replication::Progress<C>,
57+
58+
/// The `InflightId` of the replication request that produced this response.
59+
///
60+
/// - `Some(id)`: This response corresponds to a replication request that carries log
61+
/// payload. The `id` is used to match the response to the correct inflight state,
62+
/// allowing the leader to update `matching` or handle conflicts properly.
63+
///
64+
/// - `None`: This response is from an RPC without log payload (e.g., a heartbeat to
65+
/// synchronize commit index). Such RPCs don't have corresponding inflight records, so no
66+
/// inflight state update is needed.
6167
inflight_id: Option<InflightId>,
6268
},
6369

@@ -114,13 +120,8 @@ where C: RaftTypeConfig
114120
}
115121
Self::StorageError { error } => write!(f, "StorageError: {}", error),
116122
Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id),
117-
Self::ReplicationProgress {
118-
has_payload,
119-
progress,
120-
inflight_id,
121-
} => {
122-
let payload = if *has_payload { "no-payload" } else { "has-payload" };
123-
write!(f, "{payload}: {}, inflight_id: {}", progress, inflight_id.display())
123+
Self::ReplicationProgress { progress, inflight_id } => {
124+
write!(f, "{}, inflight_id: {}", progress, inflight_id.display())
124125
}
125126
Self::HeartbeatProgress {
126127
session_id: leader_vote,

openraft/src/core/raft_core.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,24 +1566,15 @@ where
15661566
}
15671567
}
15681568

1569-
Notification::ReplicationProgress {
1570-
has_payload,
1571-
progress,
1572-
inflight_id,
1573-
} => {
1569+
Notification::ReplicationProgress { progress, inflight_id } => {
15741570
// If vote or membership changes, ignore the message.
15751571
// There is chance delayed message reports a wrong state.
15761572
if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") {
15771573
tracing::debug!(progress = display(&progress), "recv Notification::ReplicationProgress");
15781574

15791575
// replication_handler() won't panic because:
15801576
// The leader is still valid because progress.session_id.leader_vote does not change.
1581-
self.engine.replication_handler().update_progress(
1582-
progress.target,
1583-
progress.result,
1584-
has_payload,
1585-
inflight_id,
1586-
);
1577+
self.engine.replication_handler().update_progress(progress.target, progress.result, inflight_id);
15871578
}
15881579
}
15891580

openraft/src/engine/handler/leader_handler/append_entries_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
151151
},
152152
Command::Replicate {
153153
target: 2,
154-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), Some(InflightId::new(1))),
154+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)),
155155
},
156156
Command::Replicate {
157157
target: 3,
158-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), Some(InflightId::new(2))),
158+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(2)),
159159
},
160160
],
161161
eng.output.take_commands()
@@ -280,7 +280,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
280280
},
281281
Command::Replicate {
282282
target: 2,
283-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), Some(InflightId::new(1)))
283+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1))
284284
},
285285
],
286286
eng.output.take_commands()

openraft/src/engine/handler/replication_handler/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ where C: RaftTypeConfig
217217
&mut self,
218218
target: C::NodeId,
219219
conflict: LogIdOf<C>,
220-
has_payload: bool,
221220
inflight_id: Option<InflightId>,
222221
) {
223222
// TODO(2): test it?
@@ -226,7 +225,7 @@ where C: RaftTypeConfig
226225

227226
let mut updater = progress::entry::update::Updater::new(self.config, prog_entry);
228227

229-
updater.update_conflicting(conflict.index(), has_payload, inflight_id);
228+
updater.update_conflicting(conflict.index(), inflight_id);
230229
}
231230

232231
/// Enable one-time replication reset for a specific node upon log reversion detection.
@@ -263,13 +262,13 @@ where C: RaftTypeConfig
263262
&mut self,
264263
target: C::NodeId,
265264
repl_res: Result<ReplicationResult<C>, String>,
266-
has_payload: bool,
267265
inflight_id: Option<InflightId>,
268266
) {
269267
tracing::debug!(
270-
"{}: target={target}, result={}, has_payload={has_payload}, current progresses={}",
268+
"{}: target={target}, result={}, inflight_id={}, current progresses={}",
271269
func_name!(),
272270
repl_res.display(),
271+
inflight_id.display(),
273272
self.leader.progress
274273
);
275274

@@ -279,7 +278,7 @@ where C: RaftTypeConfig
279278
self.update_matching(target, matching, inflight_id);
280279
}
281280
Err(conflict) => {
282-
self.update_conflicting(target, conflict, has_payload, inflight_id);
281+
self.update_conflicting(target, conflict, inflight_id);
283282
}
284283
},
285284
Err(err_str) => {
@@ -349,7 +348,7 @@ where C: RaftTypeConfig
349348
log_id_range,
350349
inflight_id,
351350
} => {
352-
let req = Replicate::logs(log_id_range.clone(), Some(*inflight_id));
351+
let req = Replicate::logs(log_id_range.clone(), *inflight_id);
353352
output.push_command(Command::Replicate {
354353
target: target.clone(),
355354
req,
@@ -433,7 +432,7 @@ where C: RaftTypeConfig
433432
// TODO: It should be self.state.last_log_id() but None is ok.
434433
prog_entry.inflight = Inflight::logs(None, upto.clone(), InflightId::new(0));
435434

436-
self.update_matching(id, upto, None);
435+
self.update_matching(id, upto, Some(InflightId::new(0)));
437436
}
438437
}
439438

openraft/src/engine/handler/vote_handler/become_leader_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ fn test_become_leader() -> anyhow::Result<()> {
7373
},
7474
Command::Replicate {
7575
target: 0,
76-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), Some(InflightId::new(1)))
76+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), InflightId::new(1))
7777
}
7878
]);
7979

openraft/src/engine/tests/handle_vote_resp_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> {
219219
},
220220
Command::Replicate {
221221
target: 2,
222-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), Some(InflightId::new(1)))
222+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), InflightId::new(1))
223223
},
224224
],
225225
eng.output.take_commands()

openraft/src/engine/tests/startup_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
8989
},
9090
Command::Replicate {
9191
target: 3,
92-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), Some(InflightId::new(1))),
92+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), InflightId::new(1)),
9393
}
9494
],
9595
eng.output.take_commands()
@@ -137,7 +137,7 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
137137
},
138138
Command::Replicate {
139139
target: 3,
140-
req: Replicate::logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), Some(InflightId::new(1)))
140+
req: Replicate::logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), InflightId::new(1))
141141
}
142142
],
143143
eng.output.take_commands()

openraft/src/progress/entry/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ fn test_update_conflicting() -> anyhow::Result<()> {
8282
pe.inflight = inflight_logs(5, 10);
8383

8484
let engine_config = EngineConfig::new_default(1);
85-
pe.new_updater(&engine_config).update_conflicting(5, true, Some(InflightId::new(0)));
85+
pe.new_updater(&engine_config).update_conflicting(5, Some(InflightId::new(0)));
8686

8787
assert_eq!(Inflight::None, pe.inflight);
8888
assert_eq!(&Some(log_id(3)), pe.borrow());

openraft/src/progress/entry/update.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ where C: RaftTypeConfig
2626
/// The conflicting log index is the last log index found on a follower that does not match
2727
/// the leader's log at that position.
2828
///
29-
/// If `has_payload` is true, the `inflight` state is reset because AppendEntries RPC
30-
/// manages the inflight state.
29+
/// If `inflight_id` is `Some`, the inflight state is reset because the response corresponds
30+
/// to a replication request with log payload. If `None`, the response is from an RPC without
31+
/// payload (e.g., heartbeat), and inflight state is not modified.
3132
///
3233
/// Normally, the `conflict` index should be greater than or equal to the `matching` index
3334
/// when follower data is intact. However, for testing purposes, a follower may clean its
@@ -36,15 +37,15 @@ where C: RaftTypeConfig
3637
/// To allow follower log reversion, enable [`Config::allow_log_reversion`].
3738
///
3839
/// [`Config::allow_log_reversion`]: `crate::config::Config::allow_log_reversion`
39-
pub(crate) fn update_conflicting(&mut self, conflict: u64, has_payload: bool, inflight_id: Option<InflightId>) {
40+
pub(crate) fn update_conflicting(&mut self, conflict: u64, inflight_id: Option<InflightId>) {
4041
tracing::debug!(
4142
"update_conflict: current progress_entry: {}; conflict: {}",
4243
self.entry,
4344
conflict
4445
);
4546

4647
// The inflight may be None if the conflict is caused by a heartbeat response.
47-
if has_payload {
48+
if let Some(inflight_id) = inflight_id {
4849
self.entry.inflight.conflict(conflict, inflight_id);
4950
}
5051

@@ -91,14 +92,21 @@ where C: RaftTypeConfig
9192
}
9293
}
9394

95+
/// Update the matching log id for this follower when replication succeeds.
96+
///
97+
/// If `inflight_id` is `Some`, the inflight state is acknowledged because the response
98+
/// corresponds to a replication request with log payload. If `None`, the response is from
99+
/// an RPC without payload (e.g., heartbeat), and inflight state is not modified.
94100
pub(crate) fn update_matching(&mut self, matching: Option<LogIdOf<C>>, inflight_id: Option<InflightId>) {
95101
tracing::debug!(
96102
"update_matching: current progress_entry: {}; matching: {}",
97103
self.entry,
98104
matching.display()
99105
);
100106

101-
self.entry.inflight.ack(matching.clone(), inflight_id);
107+
if let Some(inflight_id) = inflight_id {
108+
self.entry.inflight.ack(matching.clone(), inflight_id);
109+
}
102110

103111
debug_assert!(matching.as_ref() >= self.entry.matching());
104112
self.entry.matching = matching;

0 commit comments

Comments
 (0)