Skip to content

Commit 0c844c2

Browse files
committed
bugfix: dont drop sync messages received whilst loading
Problem: in some situations a DocHandle would appear to never get in sync with a remote even though we know both sides have the same set of changes. The reason for this is because if we receive a sync message for a document which is not loaded yet we drop the message whilst we wait for the document to load. This led to a situation where we would send a sync message back to the remote and the remote would see the message as an acknowledgement of the initial message it sent and so never send a response - but we were expecting an ack because we thought it was the first message in the conversation. Solution: add a `received_messages` queue to the `DocState::Bootstrap` state which we put messages into if we receive them whilst loading the document, then reprocess them later.
1 parent 695866d commit 0c844c2

File tree

1 file changed

+95
-19
lines changed

1 file changed

+95
-19
lines changed

src/repo.rs

Lines changed: 95 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ impl RepoHandle {
169169
DocState::Bootstrap {
170170
resolvers: vec![resolver],
171171
storage_fut: None,
172+
received_messages: Vec::new(),
172173
},
173174
);
174175
self.repo_sender
@@ -354,12 +355,13 @@ impl<T> Future for RepoFuture<T> {
354355
type BootstrapStorageFut = Option<BoxFuture<'static, Result<Option<Vec<u8>>, StorageError>>>;
355356

356357
/// The doc info state machine.
357-
pub(crate) enum DocState {
358+
enum DocState {
358359
/// Bootstrapping will resolve into a future doc handle,
359360
/// the optional storage fut represents first checking storage before the network.
360361
Bootstrap {
361362
resolvers: Vec<RepoFutureResolver<Result<DocHandle, RepoError>>>,
362363
storage_fut: BootstrapStorageFut,
364+
received_messages: Vec<NetworkEvent>,
363365
},
364366
/// Pending a load from storage, not attempting to sync over network.
365367
LoadPending {
@@ -487,6 +489,7 @@ impl DocState {
487489
DocState::Bootstrap {
488490
resolvers: _,
489491
storage_fut: Some(storage_fut),
492+
received_messages: _,
490493
} => {
491494
let waker = waker_ref(&waker);
492495
let pinned = Pin::new(storage_fut);
@@ -503,6 +506,7 @@ impl DocState {
503506
DocState::Bootstrap {
504507
resolvers: _,
505508
ref mut storage_fut,
509+
received_messages: _,
506510
} => {
507511
*storage_fut = None;
508512
}
@@ -520,6 +524,7 @@ impl DocState {
520524
DocState::Bootstrap {
521525
resolvers: _,
522526
ref mut storage_fut,
527+
received_messages: _,
523528
} => {
524529
assert!(storage_fut.is_none());
525530
*storage_fut = Some(fut);
@@ -803,7 +808,7 @@ impl DocumentInfo {
803808
wake_sender: &Sender<WakeSignal>,
804809
repo_sender: &Sender<RepoEvent>,
805810
repo_id: &RepoId,
806-
) {
811+
) -> Option<Vec<NetworkEvent>> {
807812
let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id.clone()));
808813
if matches!(self.state, DocState::LoadPending { .. }) {
809814
match self.state.poll_pending_load(waker) {
@@ -820,7 +825,7 @@ impl DocumentInfo {
820825
e
821826
))));
822827
self.state = DocState::Error;
823-
return;
828+
return None;
824829
}
825830
}
826831
self.handle_count.fetch_add(1, Ordering::SeqCst);
@@ -831,26 +836,42 @@ impl DocumentInfo {
831836
self.handle_count.clone(),
832837
repo_id.clone(),
833838
);
839+
let pending_evts = match &mut self.state {
840+
DocState::Bootstrap {
841+
received_messages, ..
842+
} => {
843+
tracing::trace!(
844+
"load complete, reprocessing {} messages",
845+
received_messages.len()
846+
);
847+
Some(std::mem::take(received_messages))
848+
}
849+
_ => None,
850+
};
834851
self.state.resolve_load_fut(Ok(Some(handle)));
835852
self.state = DocState::Sync(vec![]);
853+
pending_evts
836854
// TODO: send sync messages?
837855
}
838856
Poll::Ready(Ok(None)) => {
839857
self.state.resolve_load_fut(Ok(None));
840858
self.state = DocState::Error;
859+
None
841860
}
842861
Poll::Ready(Err(err)) => {
843862
self.state
844863
.resolve_load_fut(Err(RepoError::StorageError(err)));
845864
self.state = DocState::Error;
865+
None
846866
}
847-
Poll::Pending => {}
867+
Poll::Pending => None,
848868
}
849869
} else if matches!(self.state, DocState::Bootstrap { .. }) {
850870
match self.state.poll_pending_load(waker) {
851871
Poll::Ready(Ok(Some(val))) => {
852872
{
853873
let res = {
874+
tracing::trace!(%document_id, "load complete");
854875
let mut doc = self.document.write();
855876
doc.automerge.load_incremental(&val)
856877
};
@@ -861,7 +882,7 @@ impl DocumentInfo {
861882
e
862883
))));
863884
self.state = DocState::Error;
864-
return;
885+
return None;
865886
}
866887
}
867888
self.handle_count.fetch_add(1, Ordering::SeqCst);
@@ -873,22 +894,38 @@ impl DocumentInfo {
873894
repo_id.clone(),
874895
);
875896
self.state.resolve_bootstrap_fut(Ok(handle));
897+
let pending_evts = match &mut self.state {
898+
DocState::Bootstrap {
899+
received_messages, ..
900+
} => {
901+
tracing::trace!(
902+
"load complete, reprocessing {} messages",
903+
received_messages.len()
904+
);
905+
Some(std::mem::take(received_messages))
906+
}
907+
_ => None,
908+
};
876909
self.state = DocState::Sync(vec![]);
877910
// TODO: send sync messages?
911+
pending_evts
878912
}
879913
Poll::Ready(Ok(None)) => {
880914
// Switch to a network request.
881915
self.state.remove_bootstrap_storage_fut();
916+
None
882917
}
883918
Poll::Ready(Err(err)) => {
884919
self.state
885920
.resolve_bootstrap_fut(Err(RepoError::StorageError(err)));
886921
self.state = DocState::Error;
922+
None
887923
}
888-
Poll::Pending => {}
924+
Poll::Pending => None,
889925
}
890926
} else {
891927
self.state.poll_pending_save(waker);
928+
None
892929
}
893930
}
894931

@@ -976,6 +1013,7 @@ impl DocumentInfo {
9761013
fn receive_sync_message(
9771014
&mut self,
9781015
per_remote: HashMap<RepoId, VecDeque<SyncMessage>>,
1016+
doc_id: &DocumentId,
9791017
) -> (bool, Vec<PeerConnCommand>) {
9801018
let mut commands = Vec::new();
9811019
let (start_heads, new_heads) = {
@@ -991,6 +1029,7 @@ impl DocumentInfo {
9911029
Entry::Occupied(entry) => entry.into_mut(),
9921030
};
9931031
for message in messages {
1032+
tracing::trace!(?message, %doc_id, "receiving sync message");
9941033
conn.receive_sync_message(&mut document.automerge, message)
9951034
.expect("Failed to receive sync message.");
9961035
}
@@ -1392,6 +1431,7 @@ impl Repo {
13921431
} = pending_messages
13931432
.pop_front()
13941433
.expect("Empty pending messages.");
1434+
tracing::trace!(%document_id, %to_repo_id, ?message, "sending sync message");
13951435
let outgoing = RepoMessage::Sync {
13961436
from_repo_id,
13971437
to_repo_id,
@@ -1446,13 +1486,16 @@ impl Repo {
14461486
// `NewDoc` could be broken-up into two events: `RequestDoc` and `NewDoc`,
14471487
// the doc info could be created here.
14481488
RepoEvent::NewDoc(document_id, mut info) => {
1489+
tracing::trace!(%document_id, "NewDoc event");
14491490
if info.is_boostrapping() {
14501491
tracing::trace!("adding bootstrapping document");
14511492
if let Some(existing_info) = self.documents.get_mut(&document_id) {
14521493
if matches!(existing_info.state, DocState::Bootstrap { .. }) {
1494+
tracing::trace!(%document_id, "doc is already loading, adding to bootstrap resolvers for it");
14531495
let mut resolvers = info.state.get_bootstrap_resolvers();
14541496
existing_info.state.add_boostrap_resolvers(&mut resolvers);
14551497
} else if matches!(existing_info.state, DocState::Sync(_)) {
1498+
tracing::trace!(%document_id, "doc is already available, resolving immediately");
14561499
existing_info.handle_count.fetch_add(1, Ordering::SeqCst);
14571500
let handle = DocHandle::new(
14581501
self.repo_sender.clone(),
@@ -1470,12 +1513,14 @@ impl Repo {
14701513
} else {
14711514
let storage_fut = self.storage.get(document_id.clone());
14721515
info.state.add_boostrap_storage_fut(storage_fut);
1473-
info.poll_storage_operation(
1516+
if let Some(evts) = info.poll_storage_operation(
14741517
document_id.clone(),
14751518
&self.wake_sender,
14761519
&self.repo_sender,
14771520
&self.repo_id,
1478-
);
1521+
) {
1522+
self.pending_events.extend(evts);
1523+
}
14791524

14801525
let share_type = if info.is_boostrapping() {
14811526
Some(ShareType::Request)
@@ -1594,12 +1639,14 @@ impl Repo {
15941639
)));
15951640
return;
15961641
}
1597-
info.poll_storage_operation(
1642+
if let Some(evts) = info.poll_storage_operation(
15981643
doc_id,
15991644
&self.wake_sender,
16001645
&self.repo_sender,
16011646
&self.repo_id,
1602-
);
1647+
) {
1648+
self.pending_events.extend(evts);
1649+
}
16031650
}
16041651
RepoEvent::AddChangeObserver(doc_id, last_heads, mut observer) => {
16051652
if let Some(info) = self.documents.get_mut(&doc_id) {
@@ -1721,25 +1768,49 @@ impl Repo {
17211768
let state = DocState::Bootstrap {
17221769
resolvers: vec![],
17231770
storage_fut: None,
1771+
received_messages: Vec::new(),
17241772
};
17251773
let document = Arc::new(RwLock::new(shared_document));
17261774
let handle_count = Arc::new(AtomicUsize::new(0));
17271775
let mut info = DocumentInfo::new(state, document, handle_count);
17281776

17291777
let storage_fut = self.storage.get(document_id.clone());
17301778
info.state.add_boostrap_storage_fut(storage_fut);
1731-
info.poll_storage_operation(
1779+
if let Some(evts) = info.poll_storage_operation(
17321780
document_id.clone(),
17331781
&self.wake_sender,
17341782
&self.repo_sender,
17351783
&self.repo_id,
1736-
);
1784+
) {
1785+
self.pending_events.extend(evts);
1786+
}
17371787

17381788
info
17391789
});
17401790

1741-
if !info.state.should_sync() {
1742-
continue;
1791+
match &mut info.state {
1792+
DocState::Sync(_) => {}
1793+
DocState::Bootstrap {
1794+
storage_fut,
1795+
received_messages,
1796+
..
1797+
} => {
1798+
// This is ugly. If the document is bootstrapping we hang on to the messages to process later
1799+
// once it's loaded. But only if we're waiting to load from storage. Otherwise we want to just
1800+
// get on with sync.
1801+
if storage_fut.is_some() {
1802+
received_messages.push(NetworkEvent::Sync {
1803+
from_repo_id,
1804+
to_repo_id,
1805+
document_id,
1806+
message,
1807+
});
1808+
continue;
1809+
}
1810+
}
1811+
_ => {
1812+
continue;
1813+
}
17431814
}
17441815

17451816
let per_doc = per_doc_messages.entry(document_id).or_default();
@@ -1755,7 +1826,8 @@ impl Repo {
17551826
.get_mut(&document_id)
17561827
.expect("Doc should have an info by now.");
17571828

1758-
let (has_changes, peer_conn_commands) = info.receive_sync_message(per_remote);
1829+
let (has_changes, peer_conn_commands) =
1830+
info.receive_sync_message(per_remote, &document_id);
17591831
if has_changes && info.note_changes() {
17601832
self.documents_with_changes.push(document_id.clone());
17611833
}
@@ -1992,12 +2064,14 @@ impl Repo {
19922064
}
19932065
WakeSignal::Storage(doc_id) => {
19942066
if let Some(info) = self.documents.get_mut(&doc_id) {
1995-
info.poll_storage_operation(
2067+
if let Some(evts) = info.poll_storage_operation(
19962068
doc_id.clone(),
19972069
&self.wake_sender,
19982070
&self.repo_sender,
19992071
&self.repo_id,
2000-
);
2072+
) {
2073+
self.pending_events.extend(evts);
2074+
};
20012075
if info.state.should_sync() {
20022076
let remotes = self.remote_repos.keys().filter(|k| !info.peer_connections.contains_key(k));
20032077
// Send a sync message to all other repos we are connected
@@ -2078,12 +2152,14 @@ impl Repo {
20782152
WakeSignal::PendingCloseSink(repo_id) => self.poll_close_sinks(repo_id),
20792153
WakeSignal::Storage(doc_id) => {
20802154
if let Some(info) = self.documents.get_mut(&doc_id) {
2081-
info.poll_storage_operation(
2155+
if let Some(evts) = info.poll_storage_operation(
20822156
doc_id.clone(),
20832157
&self.wake_sender,
20842158
&self.repo_sender,
20852159
&self.repo_id,
2086-
);
2160+
) {
2161+
self.pending_events.extend(evts);
2162+
};
20872163
}
20882164
}
20892165
WakeSignal::ShareDecision(_) => {}

0 commit comments

Comments
 (0)