Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions openraft/src/raft/api/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;
use std::time::Duration;

use futures::Stream;
use openraft_macros::since;

use crate::LogIdOptionExt;
use crate::LogIndexOptionExt;
use crate::OptionalSend;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::Snapshot;
Expand All @@ -20,6 +23,8 @@ use crate::raft::TransferLeaderRequest;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::raft_inner::RaftInner;
use crate::raft::stream_append;
use crate::raft::stream_append::StreamAppendResult;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::VoteOf;
Expand Down Expand Up @@ -57,16 +62,16 @@ use crate::vote::raft_vote::RaftVoteExt;
/// Remote Leader Node Local Follower Node
/// ```
#[since(version = "0.10.0")]
pub(crate) struct ProtocolApi<'a, C>
pub(crate) struct ProtocolApi<C>
where C: RaftTypeConfig
{
inner: &'a RaftInner<C>,
inner: Arc<RaftInner<C>>,
}

impl<'a, C> ProtocolApi<'a, C>
impl<C> ProtocolApi<C>
where C: RaftTypeConfig
{
pub(in crate::raft) fn new(inner: &'a RaftInner<C>) -> Self {
pub(in crate::raft) fn new(inner: Arc<RaftInner<C>>) -> Self {
Self { inner }
}

Expand All @@ -91,6 +96,17 @@ where C: RaftTypeConfig
self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await
}

#[since(version = "0.10.0")]
pub(crate) fn stream_append<S>(
self,
stream: S,
) -> impl Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
where
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
{
stream_append::stream_append(self.inner, stream)
}

#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, Fatal<C>> {
Expand Down
20 changes: 20 additions & 0 deletions openraft/src/raft/message/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::fmt;
use crate::RaftTypeConfig;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::raft::StreamAppendError;
use crate::raft::stream_append::StreamAppendResult;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::VoteOf;

Expand Down Expand Up @@ -115,6 +117,24 @@ where C: RaftTypeConfig
pub fn is_conflict(&self) -> bool {
matches!(*self, AppendEntriesResponse::Conflict)
}

/// Convert this response to a stream append result.
///
/// Arguments:
/// - `prev_log_id`: The prev_log_id from the request, used for Conflict errors.
/// - `last_log_id`: The last_log_id of the sent entries, used for Success.
pub fn into_stream_result(
self,
prev_log_id: Option<LogIdOf<C>>,
last_log_id: Option<LogIdOf<C>>,
) -> StreamAppendResult<C> {
match self {
AppendEntriesResponse::Success => Ok(last_log_id),
AppendEntriesResponse::PartialSuccess(log_id) => Ok(log_id),
AppendEntriesResponse::Conflict => Err(StreamAppendError::Conflict(prev_log_id)),
AppendEntriesResponse::HigherVote(vote) => Err(StreamAppendError::HigherVote(vote)),
}
}
}

impl<C> fmt::Display for AppendEntriesResponse<C>
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/raft/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

mod append_entries;
mod install_snapshot;
mod stream_append_error;
mod transfer_leader;
mod vote;

Expand All @@ -18,6 +19,7 @@ pub use client_write::ClientWriteResult;
pub use install_snapshot::InstallSnapshotRequest;
pub use install_snapshot::InstallSnapshotResponse;
pub use install_snapshot::SnapshotResponse;
pub use stream_append_error::StreamAppendError;
pub use transfer_leader::TransferLeaderRequest;
pub use vote::VoteRequest;
pub use vote::VoteResponse;
Expand Down
36 changes: 36 additions & 0 deletions openraft/src/raft/message/stream_append_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::fmt;

use crate::RaftTypeConfig;
use crate::display_ext::DisplayOptionExt;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::VoteOf;

/// Error type for stream append entries.
///
/// When this error is returned, the stream is terminated.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum StreamAppendError<C: RaftTypeConfig> {
/// Log conflict at the given prev_log_id.
///
/// The follower's log at this position does not match the leader's.
Conflict(Option<LogIdOf<C>>),

/// Seen a higher vote from another leader.
HigherVote(VoteOf<C>),
}

impl<C> fmt::Display for StreamAppendError<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StreamAppendError::Conflict(log_id) => {
write!(f, "Conflict({})", log_id.display())
}
StreamAppendError::HigherVote(vote) => {
write!(f, "HigherVote({})", vote)
}
}
}
}
73 changes: 71 additions & 2 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) mod message;
mod raft_inner;
pub mod responder;
mod runtime_config_handle;
mod stream_append;
pub mod trigger;

use std::any::Any;
Expand All @@ -42,11 +43,13 @@ pub use message::ClientWriteResult;
pub use message::InstallSnapshotRequest;
pub use message::InstallSnapshotResponse;
pub use message::SnapshotResponse;
pub use message::StreamAppendError;
pub use message::TransferLeaderRequest;
pub use message::VoteRequest;
pub use message::VoteResponse;
pub use message::WriteRequest;
use openraft_macros::since;
pub use stream_append::StreamAppendResult;
use tracing::Instrument;
use tracing::Level;
use tracing::trace_span;
Expand Down Expand Up @@ -655,8 +658,8 @@ where C: RaftTypeConfig
/// - [`ProtocolApi::begin_receiving_snapshot`]
/// - [`ProtocolApi::install_full_snapshot`]
/// - [`ProtocolApi::handle_transfer_leader`]
pub(crate) fn protocol_api(&self) -> ProtocolApi<'_, C> {
ProtocolApi::new(self.inner.as_ref())
pub(crate) fn protocol_api(&self) -> ProtocolApi<C> {
ProtocolApi::new(self.inner.clone())
}

pub(crate) fn app_api(&self) -> AppApi<'_, C> {
Expand Down Expand Up @@ -688,6 +691,72 @@ where C: RaftTypeConfig
self.protocol_api().append_entries(rpc).await.into_raft_result()
}

/// Submit a stream of AppendEntries RPCs to this Raft node.
///
/// This is a stream-oriented version of [`Self::append_entries`] with pipelining support.
/// It spawns a background task that reads from the input stream, sends requests to RaftCore,
/// and forwards response receivers to the output stream. Responses are yielded in order.
///
/// ## Pipelining Behavior
///
/// - A background task reads from the input stream and sends to RaftCore
/// - Uses a bounded channel (64 slots) for backpressure
/// - Responses are yielded in order (FIFO) as they complete
///
/// ## Output
///
/// The output stream emits:
/// - `Ok(log_id)` when logs are successfully flushed
/// - `Err(e)` when an error occurs, which terminates the stream
///
/// ## Pinning
///
/// The returned stream is `!Unpin` because it uses async closures internally.
/// You must pin the stream before calling `.next()`:
///
/// ```ignore
/// use std::pin::pin;
///
/// let mut output = pin!(raft.stream_append(input));
/// while let Some(result) = output.next().await { /* ... */ }
/// ```
///
/// Alternatively, use `Box::pin` for heap pinning if the stream needs to be stored or returned:
///
/// ```ignore
/// let mut output = Box::pin(raft.stream_append(input));
/// ```
///
/// # Example
///
/// ```ignore
/// use std::pin::pin;
/// use futures::StreamExt;
///
/// let input_stream = futures::stream::iter(vec![request1, request2, request3]);
/// let mut output_stream = pin!(raft.stream_append(input_stream));
///
/// while let Some(result) = output_stream.next().await {
/// match result {
/// Ok(log_id) => println!("Flushed: {:?}", log_id),
/// Err(err) => {
/// println!("Error: {}", err);
/// break;
/// }
/// }
/// }
/// ```
#[since(version = "0.10.0")]
pub fn stream_append<S>(
&self,
stream: S,
) -> impl futures::Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
where
S: futures::Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
{
self.protocol_api().stream_append(stream)
}

/// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node.
///
/// These RPCs are sent by cluster peers which are in candidate state attempting to gather votes
Expand Down
83 changes: 83 additions & 0 deletions openraft/src/raft/stream_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Stream-based AppendEntries API implementation with pipelining.

use std::sync::Arc;

use futures::Stream;
use futures::StreamExt;

use crate::AsyncRuntime;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::core::raft_msg::RaftMsg;
use crate::entry::RaftEntry;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::StreamAppendError;
use crate::raft::raft_inner::RaftInner;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::async_runtime::MpscReceiver;
use crate::type_config::async_runtime::MpscSender;
use crate::type_config::util::TypeConfigExt;

/// Result type for stream append operations.
pub type StreamAppendResult<C> = Result<Option<LogIdOf<C>>, StreamAppendError<C>>;

const PIPELINE_BUFFER_SIZE: usize = 64;

struct Pending<C: RaftTypeConfig> {
response_rx: OneshotReceiverOf<C, AppendEntriesResponse<C>>,
prev_log_id: Option<LogIdOf<C>>,
last_log_id: Option<LogIdOf<C>>,
}

/// Create a pipelined stream that processes AppendEntries requests.
///
/// Spawns a background task that reads from input, sends to RaftCore,
/// and forwards response receivers. The returned stream awaits responses in order.
pub(crate) fn stream_append<C, S>(
inner: Arc<RaftInner<C>>,
input: S,
) -> impl Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
where
C: RaftTypeConfig,
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
{
let (tx, rx) = C::mpsc::<Pending<C>>(PIPELINE_BUFFER_SIZE);

let inner2 = inner.clone();
let _handle = C::AsyncRuntime::spawn(async move {
let inner = inner2;
futures::pin_mut!(input);

while let Some(req) = input.next().await {
let prev = req.prev_log_id.clone();
let last = req.entries.last().map(|e| e.log_id()).or(prev.clone());
let (resp_tx, resp_rx) = C::oneshot();

if inner.send_msg(RaftMsg::AppendEntries { rpc: req, tx: resp_tx }).await.is_err() {
break;
}
let pending = Pending {
response_rx: resp_rx,
prev_log_id: prev,
last_log_id: last,
};
if MpscSender::send(&tx, pending).await.is_err() {
break;
}
}
});

futures::stream::unfold(Some((rx, inner)), |state| async move {
let (mut rx, inner) = state?;
let p: Pending<C> = MpscReceiver::recv(&mut rx).await?;

let resp = inner.recv_msg(p.response_rx).await.ok()?;

let result = resp.into_stream_result(p.prev_log_id, p.last_log_id);
let cont = result.is_ok();

Some((result, if cont { Some((rx, inner)) } else { None }))
})
}
1 change: 1 addition & 0 deletions tests/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod fixtures;

mod t10_conflict_with_empty_entries;
mod t10_see_higher_vote;
mod t10_stream_append;
mod t11_append_conflicts;
mod t11_append_entries_with_bigger_term;
mod t11_append_inconsistent_log;
Expand Down
Loading