Skip to content

Commit 41af9bb

Browse files
committed
Support async for RR
1 parent 79da1aa commit 41af9bb

File tree

2 files changed

+225
-23
lines changed

2 files changed

+225
-23
lines changed

crates/wasmtime/src/runtime/rr/replay_driver.rs

Lines changed: 220 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
AsContextMut, Engine, Module, ReplayReader, ReplaySettings, Store, ValRaw, prelude::*,
1010
};
1111
use alloc::collections::BTreeMap;
12+
use alloc::sync::Arc;
1213
use core::mem::MaybeUninit;
1314
use wasmtime_environ::EntityIndex;
1415
#[cfg(feature = "rr-component")]
@@ -24,7 +25,7 @@ pub struct ReplayEnvironment {
2425
}
2526

2627
impl ReplayEnvironment {
27-
/// Create a new [`ReplayEnvironment`]
28+
/// Construct a new [`ReplayEnvironment`] from scratch
2829
pub fn new(engine: &Engine, settings: ReplaySettings) -> Self {
2930
Self {
3031
engine: engine.clone(),
@@ -47,8 +48,18 @@ impl ReplayEnvironment {
4748
}
4849

4950
/// Instantiate a new [`ReplayInstance`] using a [`ReplayReader`] in context of this environment
50-
pub fn instantiate(&self, reader: impl ReplayReader + 'static) -> Result<ReplayInstance<'_>> {
51-
ReplayInstance::from_environment(self, reader)
51+
pub fn instantiate(&self, reader: impl ReplayReader + 'static) -> Result<ReplayInstance<()>> {
52+
let store = Store::new(&self.engine, ());
53+
ReplayInstance::<()>::from_environment_and_store(self.clone(), store, reader)
54+
}
55+
56+
/// Like [`Self::instantiate`] but allows providing a custom [`Store`] generator
57+
pub fn instantiate_with_store<T>(
58+
&self,
59+
store_gen: impl FnOnce() -> Store<T>,
60+
reader: impl ReplayReader + 'static,
61+
) -> Result<ReplayInstance<T>> {
62+
ReplayInstance::from_environment_and_store(self.clone(), store_gen(), reader)
5263
}
5364
}
5465

@@ -76,47 +87,47 @@ impl ReplayEnvironment {
7687
/// Ok(())
7788
/// }
7889
/// ```
79-
pub struct ReplayInstance<'a> {
80-
/// The store doesn't need any host data because the trace format and
81-
/// replay is designed to be embedding-agnostic
82-
store: Store<()>,
83-
component_linker: component::Linker<()>,
84-
module_linker: crate::Linker<()>,
85-
modules: &'a BTreeMap<[u8; 32], Module>,
86-
components: &'a BTreeMap<[u8; 32], Component>,
90+
pub struct ReplayInstance<T: 'static> {
91+
env: Arc<ReplayEnvironment>,
92+
store: Store<T>,
93+
component_linker: component::Linker<T>,
94+
module_linker: crate::Linker<T>,
8795
module_instances: BTreeMap<core_events::InstantiationEvent, crate::Instance>,
8896
component_instances: BTreeMap<component_events::InstantiationEvent, component::Instance>,
8997
}
9098

91-
impl<'a> ReplayInstance<'a> {
92-
fn from_environment(
93-
env: &'a ReplayEnvironment,
99+
impl<T: 'static> ReplayInstance<T> {
100+
fn from_environment_and_store(
101+
env: ReplayEnvironment,
102+
mut store: Store<T>,
94103
reader: impl ReplayReader + 'static,
95104
) -> Result<Self> {
96-
let mut store = Store::new(&env.engine, ());
105+
let env = Arc::new(env);
97106
store.init_replaying(reader, env.settings.clone())?;
98-
let mut component_linker = component::Linker::<()>::new(&env.engine);
99-
let mut module_linker = crate::Linker::<()>::new(&env.engine);
107+
let mut component_linker = component::Linker::<T>::new(&env.engine);
108+
let mut module_linker = crate::Linker::<T>::new(&env.engine);
100109
// Replays shouldn't use any imports, so stub them all out as traps
101110
for module in env.modules.values() {
102-
// Defining unknown imports as trap seems to not actually trigger the entrypoint?
103-
// Use default values instead for now
104111
module_linker.define_unknown_imports_as_traps(module)?;
105112
}
106113
for component in env.components.values() {
107114
component_linker.define_unknown_imports_as_traps(component)?;
108115
}
109116
Ok(Self {
117+
env,
110118
store,
111119
component_linker,
112120
module_linker,
113-
modules: &env.modules,
114-
components: &env.components,
115121
module_instances: BTreeMap::new(),
116122
component_instances: BTreeMap::new(),
117123
})
118124
}
119125

126+
/// Obtain a reference to the internal [`Store`]
127+
pub fn store(&self) -> &Store<T> {
128+
&self.store
129+
}
130+
120131
/// Run a single top-level event from the instance
121132
///
122133
/// "Top-level" events are those explicitly invoked events, namely:
@@ -131,6 +142,7 @@ impl<'a> ReplayInstance<'a> {
131142
{
132143
// Find matching component from environment to instantiate
133144
let component = self
145+
.env
134146
.components
135147
.get(&event.component)
136148
.ok_or(ReplayError::MissingComponent(event.component))?;
@@ -215,6 +227,7 @@ impl<'a> ReplayInstance<'a> {
215227
RREvent::CoreWasmInstantiation(event) => {
216228
// Find matching module from environment to instantiate
217229
let module = self
230+
.env
218231
.modules
219232
.get(&event.module)
220233
.ok_or(ReplayError::MissingModule(event.module))?;
@@ -259,7 +272,7 @@ impl<'a> ReplayInstance<'a> {
259272

260273
// Call the function
261274
//
262-
// This is almost a mirror of the usage in [`crate::Func::call`] or [`crate::Func::call_async`]
275+
// This is almost a mirror of the usage in [`crate::Func::call_impl`]
263276
func.call_impl_check_args(&mut store, &params, &mut results)?;
264277
unsafe {
265278
func.call_impl_do_call(&mut store, params.as_slice(), results.as_mut_slice())?;
@@ -270,6 +283,172 @@ impl<'a> ReplayInstance<'a> {
270283
}
271284
Ok(())
272285
}
286+
287+
/// Exactly like [`Self::run_single_top_level_event`] but uses async stores and calls
288+
#[cfg(feature = "async")]
289+
pub async fn run_single_top_level_event_async(&mut self, rr_event: RREvent) -> Result<()>
290+
where
291+
T: Send,
292+
{
293+
match rr_event {
294+
RREvent::ComponentInstantiation(event) => {
295+
#[cfg(feature = "rr-component")]
296+
{
297+
// Find matching component from environment to instantiate
298+
let component = self
299+
.env
300+
.components
301+
.get(&event.component)
302+
.ok_or(ReplayError::MissingComponent(event.component))?;
303+
304+
let instance = self
305+
.component_linker
306+
.instantiate_async(self.store.as_context_mut(), component)
307+
.await?;
308+
// Validate the instantiation event
309+
event.validate(&component_events::InstantiationEvent {
310+
component: *component.checksum(),
311+
instance: instance.id().instance(),
312+
})?;
313+
314+
let ret = self.component_instances.insert(event, instance);
315+
// Ensures that an already-instantiated configuration is not re-instantiated
316+
assert!(ret.is_none());
317+
}
318+
#[cfg(not(feature = "rr-component"))]
319+
{
320+
anyhow!(
321+
"Cannot parse ComponentInstantation replay event without rr-component feature enabled"
322+
);
323+
}
324+
}
325+
RREvent::ComponentWasmFuncBegin(event) => {
326+
#[cfg(feature = "rr-component")]
327+
{
328+
// Grab the correct component instance
329+
let key = component_events::InstantiationEvent {
330+
component: event.component,
331+
instance: event.instance,
332+
};
333+
let instance = self
334+
.component_instances
335+
.get_mut(&key)
336+
.ok_or(ReplayError::MissingComponentInstance(key.instance.as_u32()))?;
337+
338+
// Replay lowering steps and obtain raw value arguments to raw function call
339+
let func = component::Func::from_lifted_func(*instance, event.func_idx);
340+
let store = self.store.as_context_mut();
341+
342+
// Call the function
343+
//
344+
// This is almost a mirror of the usage in [`component::Func::call_impl`]
345+
let mut results_storage = [component::Val::U64(0); MAX_FLAT_RESULTS];
346+
let mut num_results = 0;
347+
let results = &mut results_storage;
348+
let _return = unsafe {
349+
async {
350+
func.call_raw(
351+
store,
352+
|cx, _, dst: &mut MaybeUninit<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>| {
353+
// For lowering, use replay instead of actual lowering
354+
let dst: &mut [MaybeUninit<ValRaw>] = dst.assume_init_mut();
355+
cx.replay_lowering(Some(dst), ReplayLoweringPhase::WasmFuncEntry)
356+
},
357+
|cx, results_ty, src: &[ValRaw; MAX_FLAT_RESULTS]| {
358+
// Lifting can proceed exactly as normal
359+
for (result, slot) in
360+
component::Func::lift_results(cx, results_ty, src, MAX_FLAT_RESULTS)?.zip(results)
361+
{
362+
*slot = result?;
363+
num_results += 1;
364+
}
365+
Ok(())
366+
},
367+
)
368+
}.await?;
369+
};
370+
371+
log::info!(
372+
"Returned {:?} for calling {:?}",
373+
&results_storage[..num_results],
374+
func
375+
);
376+
}
377+
#[cfg(not(feature = "rr-component"))]
378+
{
379+
anyhow!(
380+
"Cannot parse ComponentWasmFuncBegin replay event without rr-component feature enabled"
381+
);
382+
}
383+
}
384+
RREvent::CoreWasmInstantiation(event) => {
385+
// Find matching module from environment to instantiate
386+
let module = self
387+
.env
388+
.modules
389+
.get(&event.module)
390+
.ok_or(ReplayError::MissingModule(event.module))?;
391+
392+
let instance = self
393+
.module_linker
394+
.instantiate_async(self.store.as_context_mut(), module)
395+
.await?;
396+
397+
// Validate the instantiation event
398+
event.validate(&core_events::InstantiationEvent {
399+
module: *module.checksum(),
400+
instance: instance.id(),
401+
})?;
402+
403+
let ret = self.module_instances.insert(event, instance);
404+
// Ensures that an already-instantiated configuration is not re-instantiated
405+
assert!(ret.is_none());
406+
}
407+
RREvent::CoreWasmFuncEntry(event) => {
408+
// Grab the correct module instance
409+
let key = core_events::InstantiationEvent {
410+
module: event.module,
411+
instance: event.origin.instance,
412+
};
413+
let instance = self
414+
.module_instances
415+
.get_mut(&key)
416+
.ok_or(ReplayError::MissingModuleInstance(key.instance.as_u32()))?;
417+
418+
let entity = EntityIndex::from(event.origin.index);
419+
let mut store = self.store.as_context_mut();
420+
let func = instance
421+
._get_export(store.0, entity)
422+
.into_func()
423+
.ok_or(ReplayError::InvalidCoreFuncIndex(entity))?;
424+
425+
let params_ty = func.ty(&store).params().collect::<Vec<_>>();
426+
427+
// Obtain the argument values for function call
428+
let mut results = vec![crate::Val::I64(0); func.ty(&store).results().len()];
429+
let params = event.args.to_val_vec(&mut store, params_ty);
430+
431+
// Call the function
432+
//
433+
// This is almost a mirror of the usage in [`crate::Func::call_impl`]
434+
func.call_impl_check_args(&mut store, &params, &mut results)?;
435+
unsafe {
436+
async {
437+
func.call_impl_do_call(
438+
&mut store,
439+
params.as_slice(),
440+
results.as_mut_slice(),
441+
)
442+
}
443+
.await?;
444+
}
445+
}
446+
447+
_ => Err(ReplayError::IncorrectEventVariant)?,
448+
}
449+
Ok(())
450+
}
451+
273452
/// Run this replay instance to completion
274453
pub fn run_to_completion(&mut self) -> Result<()> {
275454
while let Some(rr_event) = self
@@ -284,4 +463,23 @@ impl<'a> ReplayInstance<'a> {
284463
}
285464
Ok(())
286465
}
466+
467+
/// Exactly like [`Self::run_to_completion`] but uses async stores and calls
468+
#[cfg(feature = "async")]
469+
pub async fn run_to_completion_async(&mut self) -> Result<()>
470+
where
471+
T: Send,
472+
{
473+
while let Some(rr_event) = self
474+
.store
475+
.as_context_mut()
476+
.0
477+
.replay_buffer_mut()
478+
.expect("unexpected; replay buffer must be initialized within an instance")
479+
.next()
480+
{
481+
self.run_single_top_level_event_async(rr_event?).await?;
482+
}
483+
Ok(())
484+
}
287485
}

crates/wasmtime/src/runtime/store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1488,7 +1488,7 @@ impl StoreOpaque {
14881488
}
14891489

14901490
#[cfg(feature = "rr")]
1491-
pub fn init_replaying(
1491+
pub(crate) fn init_replaying(
14921492
&mut self,
14931493
replayer: impl ReplayReader + 'static,
14941494
settings: ReplaySettings,
@@ -1501,6 +1501,10 @@ impl StoreOpaque {
15011501
self.engine().is_replaying(),
15021502
"store replaying requires replaying enabled on config"
15031503
);
1504+
ensure!(
1505+
!self.engine().is_recording(),
1506+
"store replaying cannot be enabled while recording is enabled"
1507+
);
15041508
self.replay_buffer = Some(ReplayBuffer::new_replayer(replayer, settings)?);
15051509
Ok(())
15061510
}

0 commit comments

Comments
 (0)