1macro_rules! abort_assert {
14 ($cond:expr, $($arg:tt)*) => {
15 if !$cond {
16 eprintln!("Simulator internal error: {}", format!($($arg)*));
17 std::process::abort();
18 }
19 };
20}
21
22use core::{fmt, panic};
23use std::cell::{Cell, RefCell};
24use std::collections::{HashMap, VecDeque};
25use std::fmt::Debug;
26use std::panic::RefUnwindSafe;
27use std::path::Path;
28use std::pin::{Pin, pin};
29use std::rc::Rc;
30use std::task::ready;
31
32use bytes::Bytes;
33use colored::Colorize;
34use dfir_rs::scheduled::context::DfirErased;
35use futures::{Stream, StreamExt};
36use libloading::Library;
37use serde::Serialize;
38use serde::de::DeserializeOwned;
39use tempfile::TempPath;
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::sync::{Mutex, Notify};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44use super::runtime::{Hooks, InlineHooks};
45use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
46use crate::compile::builder::ExternalPortId;
47use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
48use crate::location::dynamic::LocationId;
49use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
50use crate::sim::runtime::SimHook;
51
52struct QuiescenceState {
53 quiescent: Cell<bool>,
55 quiescence_notify: Notify,
57 resume_notify: Notify,
59}
60
61impl QuiescenceState {
62 fn resume(&self) {
64 self.quiescent.set(false);
65 self.resume_notify.notify_waiters();
66 }
67
68 fn is_quiescent(&self) -> bool {
70 self.quiescent.get()
71 }
72
73 fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75 self.quiescence_notify.notified()
76 }
77
78 async fn wait_for_resume(&self) {
80 self.quiescent.set(true);
81 self.quiescence_notify.notify_waiters();
82 self.resume_notify.notified().await;
83 self.quiescent.set(false);
84 }
85}
86
87struct SimConnections {
88 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
89 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
90 cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
91 cluster_output_receivers:
92 HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
93 external_registered: HashMap<ExternalPortId, SimExternalPort>,
94 quiescence: Rc<QuiescenceState>,
95}
96
97tokio::task_local! {
98 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
99}
100
101pub struct CompiledSim {
103 pub(super) _path: TempPath,
104 pub(super) lib: Library,
105 pub(super) externals_port_registry: SimExternalPortRegistry,
106 pub(super) unit_test_fuzz_iterations: usize,
107}
108
109#[sealed::sealed]
110pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
114#[sealed::sealed]
115impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
116
117fn null_handler(_args: fmt::Arguments) {}
118
119fn println_handler(args: fmt::Arguments) {
120 println!("{}", args);
121}
122
123fn eprintln_handler(args: fmt::Arguments) {
124 eprintln!("{}", args);
125}
126
127type SimLoaded<'a> = libloading::Symbol<
133 'a,
134 unsafe extern "Rust" fn(
135 should_color: bool,
136 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
137 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
138 cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
139 cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
140 println_handler: fn(fmt::Arguments<'_>),
141 eprintln_handler: fn(fmt::Arguments<'_>),
142 ) -> (
143 Vec<(&'static str, Option<u32>, DfirErased)>,
144 Vec<(&'static str, Option<u32>, DfirErased)>,
145 Hooks<&'static str>,
146 InlineHooks<&'static str>,
147 ),
148>;
149
150impl CompiledSim {
151 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
153 self.with_instantiator(|instantiator| thunk(instantiator()), true)
154 }
155
156 pub fn with_instantiator<T>(
164 &self,
165 thunk: impl FnOnce(&dyn Instantiator) -> T,
166 always_log: bool,
167 ) -> T {
168 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
169 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
170 thunk(
171 &(|| CompiledSimInstance {
172 func: func.clone(),
173 externals_port_registry: self.externals_port_registry.clone(),
174 dylib_result: None,
175 log,
176 }),
177 )
178 }
179
180 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
191 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
192 .elements()
193 .into_iter()
194 .find(|e| {
195 !e.fn_name.starts_with("hydro_lang::sim::compiled")
196 && !e.fn_name.starts_with("hydro_lang::sim::flow")
197 && !e.fn_name.starts_with("fuzz<")
198 && !e.fn_name.starts_with("<hydro_lang::sim")
199 })
200 .unwrap();
201
202 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
203 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
204
205 let caller_fuzz_repro_path = repro_folder
206 .join(caller_fn.fn_name.replace("::", "__"))
207 .with_extension("bin");
208
209 if std::env::var("BOLERO_FUZZER").is_ok() {
210 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
211 std::fs::create_dir_all(&corpus_dir).unwrap();
212 let libfuzzer_args = format!(
213 "{} {} -artifact_prefix={}/ -handle_abrt=0",
214 corpus_dir.to_str().unwrap(),
215 corpus_dir.to_str().unwrap(),
216 corpus_dir.to_str().unwrap(),
217 );
218
219 std::fs::create_dir_all(&repro_folder).unwrap();
220
221 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
222 unsafe {
223 std::env::set_var(
224 "BOLERO_FAILURE_OUTPUT",
225 caller_fuzz_repro_path.to_str().unwrap(),
226 );
227 }
228 }
229
230 unsafe {
231 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
232 }
233
234 self.with_instantiator(
235 |instantiator| {
236 bolero::test(bolero::TargetLocation {
237 package_name: "",
238 manifest_dir: "",
239 module_path: "",
240 file: "",
241 line: 0,
242 item_path: "<unknown>::__bolero_item_path__",
243 test_name: None,
244 })
245 .run_with_replay(move |is_replay| {
246 let mut instance = instantiator();
247
248 if instance.log {
249 eprintln!(
250 "{}",
251 "\n==== New Simulation Instance ===="
252 .color(colored::Color::Cyan)
253 .bold()
254 );
255 }
256
257 if is_replay {
258 instance.log = true;
259 }
260
261 tokio::runtime::Builder::new_current_thread()
262 .build()
263 .unwrap()
264 .block_on(async { instance.run(&mut thunk).await })
265 })
266 },
267 false,
268 );
269 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
270 self.fuzz_repro(existing_bytes, async |compiled| {
271 compiled.launch();
272 thunk().await
273 });
274 } else {
275 eprintln!(
276 "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
277 caller_fuzz_repro_path.display(),
278 self.unit_test_fuzz_iterations,
279 );
280 self.with_instantiator(
281 |instantiator| {
282 bolero::test(bolero::TargetLocation {
283 package_name: "",
284 manifest_dir: "",
285 module_path: "",
286 file: ".",
287 line: 0,
288 item_path: "<unknown>::__bolero_item_path__",
289 test_name: None,
290 })
291 .with_iterations(self.unit_test_fuzz_iterations)
292 .run(move || {
293 let instance = instantiator();
294 tokio::runtime::Builder::new_current_thread()
295 .build()
296 .unwrap()
297 .block_on(async { instance.run(&mut thunk).await })
298 })
299 },
300 false,
301 );
302 }
303 }
304
305 pub fn fuzz_repro<'a>(
309 &'a self,
310 bytes: Vec<u8>,
311 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
312 ) {
313 self.with_instance(|instance| {
314 bolero::bolero_engine::any::scope::with(
315 Box::new(bolero::bolero_engine::driver::object::Object(
316 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
317 )),
318 || {
319 tokio::runtime::Builder::new_current_thread()
320 .build()
321 .unwrap()
322 .block_on(async { instance.run_without_launching(thunk).await })
323 },
324 )
325 });
326 }
327
328 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
339 if std::env::var("BOLERO_FUZZER").is_ok() {
340 eprintln!(
341 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
342 );
343 std::process::abort();
344 }
345
346 let mut count = 0;
347 let count_mut = &mut count;
348
349 self.with_instantiator(
350 |instantiator| {
351 bolero::test(bolero::TargetLocation {
352 package_name: "",
353 manifest_dir: "",
354 module_path: "",
355 file: "",
356 line: 0,
357 item_path: "<unknown>::__bolero_item_path__",
358 test_name: None,
359 })
360 .exhaustive()
361 .run_with_replay(move |is_replay| {
362 *count_mut += 1;
363
364 let mut instance = instantiator();
365 if instance.log {
366 eprintln!(
367 "{}",
368 "\n==== New Simulation Instance ===="
369 .color(colored::Color::Cyan)
370 .bold()
371 );
372 }
373
374 if is_replay {
375 instance.log = true;
376 }
377
378 tokio::runtime::Builder::new_current_thread()
379 .build()
380 .unwrap()
381 .block_on(async { instance.run(&mut thunk).await })
382 })
383 },
384 false,
385 );
386
387 count
388 }
389}
390
391type DylibResult = (
393 Vec<(&'static str, Option<u32>, DfirErased)>,
394 Vec<(&'static str, Option<u32>, DfirErased)>,
395 Hooks<&'static str>,
396 InlineHooks<&'static str>,
397);
398
399pub struct CompiledSimInstance<'a> {
402 func: SimLoaded<'a>,
403 externals_port_registry: SimExternalPortRegistry,
404 dylib_result: Option<DylibResult>,
405 log: bool,
406}
407
408impl<'a> CompiledSimInstance<'a> {
409 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
410 self.run_without_launching(async |instance| {
411 instance.launch();
412 thunk().await;
413 })
414 .await;
415 }
416
417 async fn run_without_launching(
418 mut self,
419 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
420 ) {
421 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
422 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
423 let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
424 HashMap::new();
425 let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
426
427 let dylib_result = unsafe {
428 (self.func)(
429 colored::control::SHOULD_COLORIZE.should_colorize(),
430 &mut external_out,
431 &mut external_in,
432 &mut cluster_external_out,
433 &mut cluster_external_in,
434 if self.log {
435 println_handler
436 } else {
437 null_handler
438 },
439 if self.log {
440 eprintln_handler
441 } else {
442 null_handler
443 },
444 )
445 };
446
447 let registered = &self.externals_port_registry.registered;
448
449 let quiescence = Rc::new(QuiescenceState {
450 quiescent: Cell::new(false),
451 quiescence_notify: Notify::new(),
452 resume_notify: Notify::new(),
453 });
454
455 let mut input_senders = HashMap::new();
456 let mut output_receivers = HashMap::new();
457 let mut cluster_input_senders = HashMap::new();
458 let mut cluster_output_receivers = HashMap::new();
459
460 #[expect(
461 clippy::disallowed_methods,
462 reason = "inserts into maps also unordered"
463 )]
464 for sim_port in registered.values() {
465 let usize_key = sim_port.into_inner();
466 if let Some(sender) = external_in.remove(&usize_key) {
467 input_senders.insert(*sim_port, Rc::new(sender));
468 }
469 if let Some(receiver) = external_out.remove(&usize_key) {
470 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
471 }
472 if let Some(senders) = cluster_external_in.remove(&usize_key) {
473 cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
474 }
475 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
476 cluster_output_receivers.insert(
477 *sim_port,
478 receivers
479 .into_iter()
480 .map(|r| Rc::new(Mutex::new(r)))
481 .collect(),
482 );
483 }
484 }
485
486 self.dylib_result = Some(dylib_result);
487
488 let local_set = tokio::task::LocalSet::new();
489 local_set
490 .run_until(CURRENT_SIM_CONNECTIONS.scope(
491 RefCell::new(SimConnections {
492 input_senders,
493 output_receivers,
494 cluster_input_senders,
495 cluster_output_receivers,
496 external_registered: self.externals_port_registry.registered.clone(),
497 quiescence: quiescence.clone(),
498 }),
499 async move {
500 thunk(self).await;
501 },
502 ))
503 .await;
504 }
505
506 fn launch(self) {
509 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
510 }
511
512 pub fn schedule_with_logger<W: std::io::Write>(
515 self,
516 log_writer: W,
517 ) -> impl use<W> + Future<Output = ()> {
518 self.schedule_with_maybe_logger(Some(log_writer))
519 }
520
521 fn schedule_with_maybe_logger<W: std::io::Write>(
522 mut self,
523 log_override: Option<W>,
524 ) -> impl use<W> + Future<Output = ()> {
525 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
526
527 let not_ready_observation = async_dfirs
528 .iter()
529 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
530 .collect();
531
532 let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
533 let connections = connections.borrow();
534 connections.quiescence.clone()
535 });
536
537 let mut launched = LaunchedSim {
538 async_dfirs: async_dfirs
539 .into_iter()
540 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
541 .collect(),
542 possibly_ready_ticks: vec![],
543 not_ready_ticks: tick_dfirs
544 .into_iter()
545 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
546 .collect(),
547 possibly_ready_observation: vec![],
548 not_ready_observation,
549 hooks: hooks
550 .into_iter()
551 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
552 .collect(),
553 inline_hooks: inline_hooks
554 .into_iter()
555 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
556 .collect(),
557 log: if self.log {
558 if let Some(w) = log_override {
559 LogKind::Custom(w)
560 } else {
561 LogKind::Stderr
562 }
563 } else {
564 LogKind::Null
565 },
566 quiescence,
567 };
568
569 async move { launched.scheduler().await }
570 }
571}
572
573impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
574 fn clone(&self) -> Self {
575 *self
576 }
577}
578
579impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
580
581impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
582 async fn with_stream<Out>(
583 &self,
584 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
585 ) -> Out {
586 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
587 let connections = connections.borrow();
588 let port = connections.external_registered.get(&self.0).unwrap();
589 (
590 connections.output_receivers.get(port).unwrap().clone(),
591 connections.quiescence.clone(),
592 )
593 });
594
595 let mut receiver_stream = receiver.lock().await;
596 let mut notified_fut = pin!(quiescence.notified());
597 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
598 use std::task::Poll;
599 match receiver_stream.poll_next_unpin(cx) {
600 Poll::Ready(Some(bytes)) => {
601 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
602 }
603 Poll::Ready(None) => return Poll::Ready(None),
604 Poll::Pending => {}
605 }
606 if quiescence.is_quiescent() {
607 return Poll::Ready(None);
608 }
609 let () = ready!(notified_fut.as_mut().poll(cx));
610 notified_fut.set(quiescence.notified());
611 Poll::Ready(None)
612 });
613 thunk(&mut pin!(&mut quiescence_aware)).await
614 }
615
616 pub fn assert_no_more(self) -> impl Future<Output = ()>
618 where
619 T: Debug,
620 {
621 FutureTrackingCaller {
622 future: async move {
623 self.with_stream(async |stream| {
624 if let Some(next) = stream.next().await {
625 return Err(format!(
626 "Stream yielded unexpected message: {:?}, expected termination",
627 next
628 ));
629 }
630 Ok(())
631 })
632 .await
633 },
634 }
635 }
636}
637
638impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
639 pub async fn next(&self) -> Option<T> {
642 self.with_stream(async |stream| stream.next().await).await
643 }
644
645 pub async fn collect<C: Default + Extend<T>>(self) -> C {
648 self.with_stream(async |stream| stream.collect().await)
649 .await
650 }
651
652 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
655 &self,
656 expected: I,
657 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
658 where
659 T: Debug + PartialEq<T2>,
660 {
661 FutureTrackingCaller {
662 future: async {
663 let mut expected: VecDeque<T2> = expected.into_iter().collect();
664
665 while !expected.is_empty() {
666 if let Some(next) = self.next().await {
667 let next_expected = expected.pop_front().unwrap();
668 if next != next_expected {
669 return Err(format!(
670 "Stream yielded unexpected message: {:?}, expected: {:?}",
671 next, next_expected
672 ));
673 }
674 } else {
675 return Err(format!(
676 "Stream ended early, still expected: {:?}",
677 expected
678 ));
679 }
680 }
681
682 Ok(())
683 },
684 }
685 }
686
687 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
690 &self,
691 expected: I,
692 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
693 where
694 T: Debug + PartialEq<T2>,
695 {
696 ChainedFuture {
697 first: self.assert_yields(expected),
698 second: self.assert_no_more(),
699 first_done: false,
700 }
701 }
702}
703
704pin_project_lite::pin_project! {
705 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
715 #[pin]
716 future: F,
717 }
718}
719
720impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
721 type Output = ();
722
723 #[track_caller]
724 fn poll(
725 mut self: Pin<&mut Self>,
726 cx: &mut std::task::Context<'_>,
727 ) -> std::task::Poll<Self::Output> {
728 match ready!(self.as_mut().project().future.poll(cx)) {
729 Ok(()) => std::task::Poll::Ready(()),
730 Err(e) => panic!("{}", e),
731 }
732 }
733}
734
735pin_project_lite::pin_project! {
736 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
740 #[pin]
741 first: F1,
742 #[pin]
743 second: F2,
744 first_done: bool,
745 }
746}
747
748impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
749 type Output = ();
750
751 #[track_caller]
752 fn poll(
753 mut self: Pin<&mut Self>,
754 cx: &mut std::task::Context<'_>,
755 ) -> std::task::Poll<Self::Output> {
756 if !self.first_done {
757 ready!(self.as_mut().project().first.poll(cx));
758 *self.as_mut().project().first_done = true;
759 }
760
761 self.as_mut().project().second.poll(cx)
762 }
763}
764
765impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
766 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
769 where
770 T: Ord,
771 {
772 self.with_stream(async |stream| {
773 let mut collected: C = stream.collect().await;
774 collected.as_mut().sort();
775 collected
776 })
777 .await
778 }
779
780 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
783 &self,
784 expected: I,
785 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
786 where
787 T: Debug + PartialEq<T2>,
788 {
789 FutureTrackingCaller {
790 future: async {
791 self.with_stream(async |stream| {
792 let mut expected: Vec<T2> = expected.into_iter().collect();
793
794 while !expected.is_empty() {
795 if let Some(next) = stream.next().await {
796 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
797 if let Some((i, _)) = idx {
798 expected.swap_remove(i);
799 } else {
800 return Err(format!(
801 "Stream yielded unexpected message: {:?}",
802 next
803 ));
804 }
805 } else {
806 return Err(format!(
807 "Stream ended early, still expected: {:?}",
808 expected
809 ));
810 }
811 }
812
813 Ok(())
814 })
815 .await
816 },
817 }
818 }
819
820 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
823 &self,
824 expected: I,
825 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
826 where
827 T: Debug + PartialEq<T2>,
828 {
829 ChainedFuture {
830 first: self.assert_yields_unordered(expected),
831 second: self.assert_no_more(),
832 first_done: false,
833 }
834 }
835}
836
837impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
838 fn with_sink<Out>(
839 &self,
840 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
841 ) -> Out {
842 let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
843 let connections = connections.borrow();
844 (
845 connections
846 .input_senders
847 .get(connections.external_registered.get(&self.0).unwrap())
848 .unwrap()
849 .clone(),
850 connections.quiescence.clone(),
851 )
852 });
853
854 thunk(&move |t| {
855 let res = sender.send(bincode::serialize(&t).unwrap().into());
856 quiescence.resume();
857 res
858 })
859 }
860}
861
862impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
863 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
866 self.with_sink(|send| {
867 for t in iter {
868 send(t).unwrap();
869 }
870 })
871 }
872}
873
874impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
875 pub fn send(&self, t: T) {
878 self.with_sink(|send| send(t)).unwrap();
879 }
880
881 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
884 self.with_sink(|send| {
885 for t in iter {
886 send(t).unwrap();
887 }
888 })
889 }
890}
891
892impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
893 for SimClusterReceiver<T, O, R>
894{
895 fn clone(&self) -> Self {
896 *self
897 }
898}
899
900impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
901 for SimClusterReceiver<T, O, R>
902{
903}
904
905impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
906 async fn with_member_stream<Out>(
907 &self,
908 member_id: u32,
909 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
910 ) -> Out {
911 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
912 let connections = connections.borrow();
913 let port = connections.external_registered.get(&self.0).unwrap();
914 let receivers = connections.cluster_output_receivers.get(port).unwrap();
915 (
916 receivers[member_id as usize].clone(),
917 connections.quiescence.clone(),
918 )
919 });
920
921 let mut lock = receiver.lock().await;
922 let mut notified_fut = pin!(quiescence.notified());
923 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
924 use std::task::Poll;
925 match lock.poll_next_unpin(cx) {
926 Poll::Ready(Some(bytes)) => {
927 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
928 }
929 Poll::Ready(None) => return Poll::Ready(None),
930 Poll::Pending => {}
931 }
932 if quiescence.is_quiescent() {
933 return Poll::Ready(None);
934 }
935 let () = ready!(notified_fut.as_mut().poll(cx));
936 notified_fut.set(quiescence.notified());
937 Poll::Ready(None)
938 });
939 thunk(&mut pin!(&mut quiescence_aware)).await
940 }
941}
942
943impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
944 pub async fn next(&self, member_id: u32) -> Option<T> {
946 self.with_member_stream(member_id, async |stream| stream.next().await)
947 .await
948 }
949
950 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
952 self.with_member_stream(member_id, async |stream| stream.collect().await)
953 .await
954 }
955}
956
957impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
958 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
960 where
961 T: Ord,
962 {
963 self.with_member_stream(member_id, async |stream| {
964 let mut collected: C = stream.collect().await;
965 collected.as_mut().sort();
966 collected
967 })
968 .await
969 }
970}
971
972impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
973 fn with_sink<Out>(
974 &self,
975 thunk: impl FnOnce(
976 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
977 ) -> Out,
978 ) -> Out {
979 let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
980 let connections = connections.borrow();
981 (
982 connections
983 .cluster_input_senders
984 .get(connections.external_registered.get(&self.0).unwrap())
985 .unwrap()
986 .clone(),
987 connections.quiescence.clone(),
988 )
989 });
990
991 thunk(&move |member_id: u32, t: T| {
992 let payload = bincode::serialize(&t).unwrap();
993 let res = senders[member_id as usize].send(Bytes::from(payload));
994 quiescence.resume();
995 res
996 })
997 }
998}
999
1000impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
1001 pub fn send(&self, member_id: u32, t: T) {
1003 self.with_sink(|send| send(member_id, t)).unwrap();
1004 }
1005
1006 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
1008 self.with_sink(|send| {
1009 for (member_id, t) in iter {
1010 send(member_id, t).unwrap();
1011 }
1012 })
1013 }
1014}
1015
1016enum LogKind<W: std::io::Write> {
1017 Null,
1018 Stderr,
1019 Custom(W),
1020}
1021
1022impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1024 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1025 match self {
1026 LogKind::Null => Ok(()),
1027 LogKind::Stderr => {
1028 eprint!("{}", s);
1029 Ok(())
1030 }
1031 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1032 }
1033 }
1034}
1035
1036struct LaunchedSim<W: std::io::Write> {
1048 async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1051 possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1054 not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1056 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1060 not_ready_observation: Vec<(LocationId, Option<u32>)>,
1062 hooks: Hooks<LocationId>,
1065 inline_hooks: InlineHooks<LocationId>,
1069 log: LogKind<W>,
1070 quiescence: Rc<QuiescenceState>,
1072}
1073
1074impl<W: std::io::Write> LaunchedSim<W> {
1075 async fn scheduler(&mut self) {
1076 loop {
1077 tokio::task::yield_now().await;
1078 let mut any_made_progress = false;
1079 for (loc, c_id, dfir) in &mut self.async_dfirs {
1080 if dfir.run_tick().await {
1081 any_made_progress = true;
1082 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1083 .not_ready_ticks
1084 .drain(..)
1085 .partition(|(tick_loc, tick_c_id, _)| {
1086 let LocationId::Tick(_, outer) = tick_loc else {
1087 unreachable!()
1088 };
1089 outer.as_ref() == loc && tick_c_id == c_id
1090 });
1091
1092 self.possibly_ready_ticks.extend(now_ready);
1093 self.not_ready_ticks.extend(still_not_ready);
1094
1095 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1096 .not_ready_observation
1097 .drain(..)
1098 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1099
1100 self.possibly_ready_observation.extend(now_ready_obs);
1101 self.not_ready_observation.extend(still_not_ready_obs);
1102 }
1103 }
1104
1105 if any_made_progress {
1106 continue;
1107 } else {
1108 use bolero::generator::*;
1109
1110 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1111 .possibly_ready_ticks
1112 .drain(..)
1113 .partition(|(name, cid, _)| {
1114 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1115 hooks.iter().all(|hook| hook.is_ready())
1117 && hooks.iter().any(|hook| {
1119 hook.current_decision().unwrap_or(false)
1120 || hook.can_make_nontrivial_decision()
1121 })
1122 });
1123
1124 self.possibly_ready_ticks = ready_tick;
1125 self.not_ready_ticks.append(&mut not_ready_tick);
1126
1127 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1128 .possibly_ready_observation
1129 .drain(..)
1130 .partition(|(name, cid)| {
1131 self.hooks
1132 .get(&(name.clone(), *cid))
1133 .into_iter()
1134 .flatten()
1135 .any(|hook| {
1136 hook.current_decision().unwrap_or(false)
1137 || hook.can_make_nontrivial_decision()
1138 })
1139 });
1140
1141 self.possibly_ready_observation = ready_obs;
1142 self.not_ready_observation.append(&mut not_ready_obs);
1143
1144 if self.possibly_ready_ticks.is_empty()
1145 && self.possibly_ready_observation.is_empty()
1146 {
1147 for (name, cid, _) in &self.not_ready_ticks {
1150 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1151 abort_assert!(
1152 hooks.iter().all(|hook| hook.is_ready()),
1153 "tick has a hook that never became ready"
1154 );
1155 }
1156
1157 self.quiescence.wait_for_resume().await;
1159 } else {
1160 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1161 + self.possibly_ready_observation.len()))
1162 .any();
1163
1164 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1165 let next_tick = next_tick_or_obs;
1166 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1167
1168 match &mut self.log {
1169 LogKind::Null => {}
1170 LogKind::Stderr => {
1171 if let Some(cid) = &removed.1 {
1172 eprintln!(
1173 "\n{}",
1174 format!("Running Tick (Cluster Member {})", cid)
1175 .color(colored::Color::Magenta)
1176 .bold()
1177 )
1178 } else {
1179 eprintln!(
1180 "\n{}",
1181 "Running Tick".color(colored::Color::Magenta).bold()
1182 )
1183 }
1184 }
1185 LogKind::Custom(writer) => {
1186 writeln!(
1187 writer,
1188 "\n{}",
1189 "Running Tick".color(colored::Color::Magenta).bold()
1190 )
1191 .unwrap();
1192 }
1193 }
1194
1195 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1196 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1197 write.write_str(" ")
1198 };
1199
1200 let mut tick_decision_writer = indenter::indented(&mut self.log)
1201 .with_format(indenter::Format::Custom {
1202 inserter: &mut asterisk_indenter,
1203 });
1204
1205 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1206 run_hooks(&mut tick_decision_writer, hooks);
1207
1208 let run_tick_future = removed.2.run_tick();
1209 if let Some(inline_hooks) =
1210 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1211 {
1212 let mut run_tick_future_pinned = pin!(run_tick_future);
1213
1214 loop {
1215 tokio::select! {
1216 biased;
1217 r = &mut run_tick_future_pinned => {
1218 abort_assert!(r, "tick DFIR run_tick() returned false");
1219 break;
1220 }
1221 _ = async {} => {
1222 bolero_generator::any::scope::borrow_with(|driver| {
1223 for hook in inline_hooks.iter_mut() {
1224 if hook.pending_decision() {
1225 if !hook.has_decision() {
1226 hook.autonomous_decision(driver);
1227 }
1228
1229 hook.release_decision(&mut tick_decision_writer);
1230 }
1231 }
1232 });
1233 }
1234 }
1235 }
1236 } else {
1237 abort_assert!(
1238 run_tick_future.await,
1239 "tick DFIR run_tick() returned false"
1240 );
1241 }
1242
1243 self.possibly_ready_ticks.push(removed);
1244 } else {
1245 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1246 let mut default_hooks = vec![];
1247 let hooks = self
1248 .hooks
1249 .get_mut(&self.possibly_ready_observation[next_obs])
1250 .unwrap_or(&mut default_hooks);
1251
1252 run_hooks(&mut self.log, hooks);
1253 }
1254 }
1255 }
1256 }
1257 }
1258}
1259
1260fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1261 let mut remaining_decision_count = hooks.len();
1262 let mut made_nontrivial_decision = false;
1263
1264 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1265 hooks.iter_mut().for_each(|hook| {
1267 if let Some(is_nontrivial) = hook.current_decision() {
1268 made_nontrivial_decision |= is_nontrivial;
1269 remaining_decision_count -= 1;
1270 } else if !hook.can_make_nontrivial_decision() {
1271 hook.autonomous_decision(driver, false);
1275 remaining_decision_count -= 1;
1276 }
1277 });
1278
1279 hooks.iter_mut().for_each(|hook| {
1280 if hook.current_decision().is_none() {
1281 made_nontrivial_decision |= hook.autonomous_decision(
1282 driver,
1283 !made_nontrivial_decision && remaining_decision_count == 1,
1284 );
1285 remaining_decision_count -= 1;
1286 }
1287
1288 hook.release_decision(tick_decision_writer);
1289 });
1290 });
1291}