Skip to content

Conversation

@drmingdrmer
Copy link
Member

@drmingdrmer drmingdrmer commented Nov 27, 2025

Changelog

feat: add stream-oriented AppendEntries API for pipelined replication

Add Raft::stream_append() method that allows processing multiple AppendEntries
requests through a pipelined stream interface. This enables efficient batched
log replication with backpressure support via a bounded channel.

Example usage:

use std::pin::pin;
use futures::StreamExt;

let input = futures::stream::iter(vec![request1, request2, request3]);
let mut output = pin!(raft.stream_append(input));

while let Some(result) = output.next().await {
    match result {
        Ok(log_id) => println!("Flushed: {:?}", log_id),
        Err(err) => {
            println!("Error: {}", err);
            break;
        }
    }
}

Changes:

  • Add Raft::stream_append() for stream-based AppendEntries processing
  • Add StreamAppendError with Conflict and HigherVote variants
  • Add StreamAppendResult type alias for stream item results
  • Add AppendEntriesResponse::into_stream_result() conversion method
  • Add ProtocolApi::stream_append() internal implementation
  • Add integration tests for success, conflict, and higher vote scenarios


This change is Reviewable

Add `Raft::stream_append()` method that allows processing multiple AppendEntries
requests through a pipelined stream interface. This enables efficient batched
log replication with backpressure support via a bounded channel.

Example usage:

```rust
use std::pin::pin;
use futures::StreamExt;

let input = futures::stream::iter(vec![request1, request2, request3]);
let mut output = pin!(raft.stream_append(input));

while let Some(result) = output.next().await {
    match result {
        Ok(log_id) => println!("Flushed: {:?}", log_id),
        Err(err) => {
            println!("Error: {}", err);
            break;
        }
    }
}
```

Changes:
- Add `Raft::stream_append()` for stream-based AppendEntries processing
- Add `StreamAppendError` with `Conflict` and `HigherVote` variants
- Add `StreamAppendResult` type alias for stream item results
- Add `AppendEntriesResponse::into_stream_result()` conversion method
- Add `ProtocolApi::stream_append()` internal implementation
- Add integration tests for success, conflict, and higher vote scenarios
Copy link
Collaborator

@xp-trumpet xp-trumpet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xp-trumpet reviewed 8 of 8 files at r1, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @drmingdrmer)

@drmingdrmer drmingdrmer added this pull request to the merge queue Nov 27, 2025
Merged via the queue into databendlabs:main with commit 7dabf9e Nov 27, 2025
36 checks passed
@drmingdrmer drmingdrmer deleted the 273-stream-append branch November 27, 2025 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants