hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30
31pub mod networking;
32
33/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
34#[sealed::sealed]
35pub trait Ordering:
36 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
37{
38 /// The [`StreamOrder`] corresponding to this type.
39 const ORDERING_KIND: StreamOrder;
40}
41
42/// Marks the stream as being totally ordered, which means that there are
43/// no sources of non-determinism (other than intentional ones) that will
44/// affect the order of elements.
45pub enum TotalOrder {}
46
47#[sealed::sealed]
48impl Ordering for TotalOrder {
49 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
50}
51
52/// Marks the stream as having no order, which means that the order of
53/// elements may be affected by non-determinism.
54///
55/// This restricts certain operators, such as `fold` and `reduce`, to only
56/// be used with commutative aggregation functions.
57pub enum NoOrder {}
58
59#[sealed::sealed]
60impl Ordering for NoOrder {
61 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
62}
63
64/// Helper trait for determining the weakest of two orderings.
65#[sealed::sealed]
66pub trait MinOrder<Other: ?Sized> {
67 /// The weaker of the two orderings.
68 type Min: Ordering;
69}
70
71#[sealed::sealed]
72impl MinOrder<NoOrder> for TotalOrder {
73 type Min = NoOrder;
74}
75
76#[sealed::sealed]
77impl MinOrder<TotalOrder> for TotalOrder {
78 type Min = TotalOrder;
79}
80
81#[sealed::sealed]
82impl MinOrder<TotalOrder> for NoOrder {
83 type Min = NoOrder;
84}
85
86#[sealed::sealed]
87impl MinOrder<NoOrder> for NoOrder {
88 type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94 MinRetries<Self, Min = Self>
95 + MinRetries<ExactlyOnce, Min = Self>
96 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98 /// The [`StreamRetry`] corresponding to this type.
99 const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Helper trait for determining the weakest of two retry guarantees.
121#[sealed::sealed]
122pub trait MinRetries<Other: ?Sized> {
123 /// The weaker of the two retry guarantees.
124 type Min: Retries;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for ExactlyOnce {
129 type Min = AtLeastOnce;
130}
131
132#[sealed::sealed]
133impl MinRetries<ExactlyOnce> for ExactlyOnce {
134 type Min = ExactlyOnce;
135}
136
137#[sealed::sealed]
138impl MinRetries<ExactlyOnce> for AtLeastOnce {
139 type Min = AtLeastOnce;
140}
141
142#[sealed::sealed]
143impl MinRetries<AtLeastOnce> for AtLeastOnce {
144 type Min = AtLeastOnce;
145}
146
147/// Streaming sequence of elements with type `Type`.
148///
149/// This live collection represents a growing sequence of elements, with new elements being
150/// asynchronously appended to the end of the sequence. This can be used to model the arrival
151/// of network input, such as API requests, or streaming ingestion.
152///
153/// By default, all streams have deterministic ordering and each element is materialized exactly
154/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
155/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
156/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
157///
158/// Type Parameters:
159/// - `Type`: the type of elements in the stream
160/// - `Loc`: the location where the stream is being materialized
161/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
162/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
163/// (default is [`TotalOrder`])
164/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
165/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
166pub struct Stream<
167 Type,
168 Loc,
169 Bound: Boundedness = Unbounded,
170 Order: Ordering = TotalOrder,
171 Retry: Retries = ExactlyOnce,
172> {
173 pub(crate) location: Loc,
174 pub(crate) ir_node: RefCell<HydroNode>,
175
176 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
177}
178
179impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
180 for Stream<T, L, Unbounded, O, R>
181where
182 L: Location<'a>,
183{
184 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
185 Stream {
186 location: stream.location,
187 ir_node: stream.ir_node,
188 _phantom: PhantomData,
189 }
190 }
191}
192
193impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
194 for Stream<T, L, B, NoOrder, R>
195where
196 L: Location<'a>,
197{
198 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
199 Stream {
200 location: stream.location,
201 ir_node: stream.ir_node,
202 _phantom: PhantomData,
203 }
204 }
205}
206
207impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
208 for Stream<T, L, B, O, AtLeastOnce>
209where
210 L: Location<'a>,
211{
212 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
213 Stream {
214 location: stream.location,
215 ir_node: stream.ir_node,
216 _phantom: PhantomData,
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
222where
223 L: Location<'a>,
224{
225 fn defer_tick(self) -> Self {
226 Stream::defer_tick(self)
227 }
228}
229
230impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
231 for Stream<T, Tick<L>, Bounded, O, R>
232where
233 L: Location<'a>,
234{
235 type Location = Tick<L>;
236
237 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
238 Stream::new(
239 location.clone(),
240 HydroNode::CycleSource {
241 ident,
242 metadata: location.new_node_metadata(Self::collection_kind()),
243 },
244 )
245 }
246}
247
248impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
249 for Stream<T, Tick<L>, Bounded, O, R>
250where
251 L: Location<'a>,
252{
253 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
254 assert_eq!(
255 Location::id(&self.location),
256 expected_location,
257 "locations do not match"
258 );
259 self.location
260 .flow_state()
261 .borrow_mut()
262 .push_root(HydroRoot::CycleSink {
263 ident,
264 input: Box::new(self.ir_node.into_inner()),
265 op_metadata: HydroIrOpMetadata::new(),
266 });
267 }
268}
269
270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
271 for Stream<T, L, B, O, R>
272where
273 L: Location<'a> + NoTick,
274{
275 type Location = L;
276
277 fn create_source(ident: syn::Ident, location: L) -> Self {
278 Stream::new(
279 location.clone(),
280 HydroNode::CycleSource {
281 ident,
282 metadata: location.new_node_metadata(Self::collection_kind()),
283 },
284 )
285 }
286}
287
288impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
289 for Stream<T, L, B, O, R>
290where
291 L: Location<'a> + NoTick,
292{
293 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
294 assert_eq!(
295 Location::id(&self.location),
296 expected_location,
297 "locations do not match"
298 );
299 self.location
300 .flow_state()
301 .borrow_mut()
302 .push_root(HydroRoot::CycleSink {
303 ident,
304 input: Box::new(self.ir_node.into_inner()),
305 op_metadata: HydroIrOpMetadata::new(),
306 });
307 }
308}
309
310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
311where
312 T: Clone,
313 L: Location<'a>,
314{
315 fn clone(&self) -> Self {
316 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
317 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
318 *self.ir_node.borrow_mut() = HydroNode::Tee {
319 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
320 metadata: self.location.new_node_metadata(Self::collection_kind()),
321 };
322 }
323
324 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
325 Stream {
326 location: self.location.clone(),
327 ir_node: HydroNode::Tee {
328 inner: TeeNode(inner.0.clone()),
329 metadata: metadata.clone(),
330 }
331 .into(),
332 _phantom: PhantomData,
333 }
334 } else {
335 unreachable!()
336 }
337 }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
341where
342 L: Location<'a>,
343{
344 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
345 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
346 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
347
348 Stream {
349 location,
350 ir_node: RefCell::new(ir_node),
351 _phantom: PhantomData,
352 }
353 }
354
355 /// Returns the [`Location`] where this stream is being materialized.
356 pub fn location(&self) -> &L {
357 &self.location
358 }
359
360 pub(crate) fn collection_kind() -> CollectionKind {
361 CollectionKind::Stream {
362 bound: B::BOUND_KIND,
363 order: O::ORDERING_KIND,
364 retry: R::RETRIES_KIND,
365 element_type: stageleft::quote_type::<T>().into(),
366 }
367 }
368
369 /// Produces a stream based on invoking `f` on each element.
370 /// If you do not want to modify the stream and instead only want to view
371 /// each item use [`Stream::inspect`] instead.
372 ///
373 /// # Example
374 /// ```rust
375 /// # #[cfg(feature = "deploy")] {
376 /// # use hydro_lang::prelude::*;
377 /// # use futures::StreamExt;
378 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
379 /// let words = process.source_iter(q!(vec!["hello", "world"]));
380 /// words.map(q!(|x| x.to_uppercase()))
381 /// # }, |mut stream| async move {
382 /// # for w in vec!["HELLO", "WORLD"] {
383 /// # assert_eq!(stream.next().await.unwrap(), w);
384 /// # }
385 /// # }));
386 /// # }
387 /// ```
388 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
389 where
390 F: Fn(T) -> U + 'a,
391 {
392 let f = f.splice_fn1_ctx(&self.location).into();
393 Stream::new(
394 self.location.clone(),
395 HydroNode::Map {
396 f,
397 input: Box::new(self.ir_node.into_inner()),
398 metadata: self
399 .location
400 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
401 },
402 )
403 }
404
405 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
406 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
407 /// for the output type `U` must produce items in a **deterministic** order.
408 ///
409 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
410 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
411 ///
412 /// # Example
413 /// ```rust
414 /// # #[cfg(feature = "deploy")] {
415 /// # use hydro_lang::prelude::*;
416 /// # use futures::StreamExt;
417 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418 /// process
419 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
420 /// .flat_map_ordered(q!(|x| x))
421 /// # }, |mut stream| async move {
422 /// // 1, 2, 3, 4
423 /// # for w in (1..5) {
424 /// # assert_eq!(stream.next().await.unwrap(), w);
425 /// # }
426 /// # }));
427 /// # }
428 /// ```
429 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
430 where
431 I: IntoIterator<Item = U>,
432 F: Fn(T) -> I + 'a,
433 {
434 let f = f.splice_fn1_ctx(&self.location).into();
435 Stream::new(
436 self.location.clone(),
437 HydroNode::FlatMap {
438 f,
439 input: Box::new(self.ir_node.into_inner()),
440 metadata: self
441 .location
442 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
443 },
444 )
445 }
446
447 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
448 /// for the output type `U` to produce items in any order.
449 ///
450 /// # Example
451 /// ```rust
452 /// # #[cfg(feature = "deploy")] {
453 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
454 /// # use futures::StreamExt;
455 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
456 /// process
457 /// .source_iter(q!(vec![
458 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
459 /// std::collections::HashSet::from_iter(vec![3, 4]),
460 /// ]))
461 /// .flat_map_unordered(q!(|x| x))
462 /// # }, |mut stream| async move {
463 /// // 1, 2, 3, 4, but in no particular order
464 /// # let mut results = Vec::new();
465 /// # for w in (1..5) {
466 /// # results.push(stream.next().await.unwrap());
467 /// # }
468 /// # results.sort();
469 /// # assert_eq!(results, vec![1, 2, 3, 4]);
470 /// # }));
471 /// # }
472 /// ```
473 pub fn flat_map_unordered<U, I, F>(
474 self,
475 f: impl IntoQuotedMut<'a, F, L>,
476 ) -> Stream<U, L, B, NoOrder, R>
477 where
478 I: IntoIterator<Item = U>,
479 F: Fn(T) -> I + 'a,
480 {
481 let f = f.splice_fn1_ctx(&self.location).into();
482 Stream::new(
483 self.location.clone(),
484 HydroNode::FlatMap {
485 f,
486 input: Box::new(self.ir_node.into_inner()),
487 metadata: self
488 .location
489 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
490 },
491 )
492 }
493
494 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
495 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
496 ///
497 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
498 /// not deterministic, use [`Stream::flatten_unordered`] instead.
499 ///
500 /// ```rust
501 /// # #[cfg(feature = "deploy")] {
502 /// # use hydro_lang::prelude::*;
503 /// # use futures::StreamExt;
504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505 /// process
506 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
507 /// .flatten_ordered()
508 /// # }, |mut stream| async move {
509 /// // 1, 2, 3, 4
510 /// # for w in (1..5) {
511 /// # assert_eq!(stream.next().await.unwrap(), w);
512 /// # }
513 /// # }));
514 /// # }
515 /// ```
516 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
517 where
518 T: IntoIterator<Item = U>,
519 {
520 self.flat_map_ordered(q!(|d| d))
521 }
522
523 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
524 /// for the element type `T` to produce items in any order.
525 ///
526 /// # Example
527 /// ```rust
528 /// # #[cfg(feature = "deploy")] {
529 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
530 /// # use futures::StreamExt;
531 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
532 /// process
533 /// .source_iter(q!(vec![
534 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
535 /// std::collections::HashSet::from_iter(vec![3, 4]),
536 /// ]))
537 /// .flatten_unordered()
538 /// # }, |mut stream| async move {
539 /// // 1, 2, 3, 4, but in no particular order
540 /// # let mut results = Vec::new();
541 /// # for w in (1..5) {
542 /// # results.push(stream.next().await.unwrap());
543 /// # }
544 /// # results.sort();
545 /// # assert_eq!(results, vec![1, 2, 3, 4]);
546 /// # }));
547 /// # }
548 /// ```
549 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
550 where
551 T: IntoIterator<Item = U>,
552 {
553 self.flat_map_unordered(q!(|d| d))
554 }
555
556 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
557 /// `f`, preserving the order of the elements.
558 ///
559 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
560 /// not modify or take ownership of the values. If you need to modify the values while filtering
561 /// use [`Stream::filter_map`] instead.
562 ///
563 /// # Example
564 /// ```rust
565 /// # #[cfg(feature = "deploy")] {
566 /// # use hydro_lang::prelude::*;
567 /// # use futures::StreamExt;
568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569 /// process
570 /// .source_iter(q!(vec![1, 2, 3, 4]))
571 /// .filter(q!(|&x| x > 2))
572 /// # }, |mut stream| async move {
573 /// // 3, 4
574 /// # for w in (3..5) {
575 /// # assert_eq!(stream.next().await.unwrap(), w);
576 /// # }
577 /// # }));
578 /// # }
579 /// ```
580 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
581 where
582 F: Fn(&T) -> bool + 'a,
583 {
584 let f = f.splice_fn1_borrow_ctx(&self.location).into();
585 Stream::new(
586 self.location.clone(),
587 HydroNode::Filter {
588 f,
589 input: Box::new(self.ir_node.into_inner()),
590 metadata: self.location.new_node_metadata(Self::collection_kind()),
591 },
592 )
593 }
594
595 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
596 ///
597 /// # Example
598 /// ```rust
599 /// # #[cfg(feature = "deploy")] {
600 /// # use hydro_lang::prelude::*;
601 /// # use futures::StreamExt;
602 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
603 /// process
604 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
605 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
606 /// # }, |mut stream| async move {
607 /// // 1, 2
608 /// # for w in (1..3) {
609 /// # assert_eq!(stream.next().await.unwrap(), w);
610 /// # }
611 /// # }));
612 /// # }
613 /// ```
614 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
615 where
616 F: Fn(T) -> Option<U> + 'a,
617 {
618 let f = f.splice_fn1_ctx(&self.location).into();
619 Stream::new(
620 self.location.clone(),
621 HydroNode::FilterMap {
622 f,
623 input: Box::new(self.ir_node.into_inner()),
624 metadata: self
625 .location
626 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
627 },
628 )
629 }
630
631 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
632 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
633 /// If `other` is an empty [`Optional`], no values will be produced.
634 ///
635 /// # Example
636 /// ```rust
637 /// # #[cfg(feature = "deploy")] {
638 /// # use hydro_lang::prelude::*;
639 /// # use futures::StreamExt;
640 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
641 /// let tick = process.tick();
642 /// let batch = process
643 /// .source_iter(q!(vec![1, 2, 3, 4]))
644 /// .batch(&tick, nondet!(/** test */));
645 /// let count = batch.clone().count(); // `count()` returns a singleton
646 /// batch.cross_singleton(count).all_ticks()
647 /// # }, |mut stream| async move {
648 /// // (1, 4), (2, 4), (3, 4), (4, 4)
649 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
650 /// # assert_eq!(stream.next().await.unwrap(), w);
651 /// # }
652 /// # }));
653 /// # }
654 /// ```
655 pub fn cross_singleton<O2>(
656 self,
657 other: impl Into<Optional<O2, L, Bounded>>,
658 ) -> Stream<(T, O2), L, B, O, R>
659 where
660 O2: Clone,
661 {
662 let other: Optional<O2, L, Bounded> = other.into();
663 check_matching_location(&self.location, &other.location);
664
665 Stream::new(
666 self.location.clone(),
667 HydroNode::CrossSingleton {
668 left: Box::new(self.ir_node.into_inner()),
669 right: Box::new(other.ir_node.into_inner()),
670 metadata: self
671 .location
672 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
673 },
674 )
675 }
676
677 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
678 ///
679 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
680 /// leader of a cluster.
681 ///
682 /// # Example
683 /// ```rust
684 /// # #[cfg(feature = "deploy")] {
685 /// # use hydro_lang::prelude::*;
686 /// # use futures::StreamExt;
687 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688 /// let tick = process.tick();
689 /// // ticks are lazy by default, forces the second tick to run
690 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
691 ///
692 /// let batch_first_tick = process
693 /// .source_iter(q!(vec![1, 2, 3, 4]))
694 /// .batch(&tick, nondet!(/** test */));
695 /// let batch_second_tick = process
696 /// .source_iter(q!(vec![5, 6, 7, 8]))
697 /// .batch(&tick, nondet!(/** test */))
698 /// .defer_tick(); // appears on the second tick
699 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
700 /// batch_first_tick.chain(batch_second_tick)
701 /// .filter_if_some(some_on_first_tick)
702 /// .all_ticks()
703 /// # }, |mut stream| async move {
704 /// // [1, 2, 3, 4]
705 /// # for w in vec![1, 2, 3, 4] {
706 /// # assert_eq!(stream.next().await.unwrap(), w);
707 /// # }
708 /// # }));
709 /// # }
710 /// ```
711 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
712 self.cross_singleton(signal.map(q!(|_u| ())))
713 .map(q!(|(d, _signal)| d))
714 }
715
716 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
717 ///
718 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
719 /// some local state.
720 ///
721 /// # Example
722 /// ```rust
723 /// # #[cfg(feature = "deploy")] {
724 /// # use hydro_lang::prelude::*;
725 /// # use futures::StreamExt;
726 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727 /// let tick = process.tick();
728 /// // ticks are lazy by default, forces the second tick to run
729 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730 ///
731 /// let batch_first_tick = process
732 /// .source_iter(q!(vec![1, 2, 3, 4]))
733 /// .batch(&tick, nondet!(/** test */));
734 /// let batch_second_tick = process
735 /// .source_iter(q!(vec![5, 6, 7, 8]))
736 /// .batch(&tick, nondet!(/** test */))
737 /// .defer_tick(); // appears on the second tick
738 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
739 /// batch_first_tick.chain(batch_second_tick)
740 /// .filter_if_none(some_on_first_tick)
741 /// .all_ticks()
742 /// # }, |mut stream| async move {
743 /// // [5, 6, 7, 8]
744 /// # for w in vec![5, 6, 7, 8] {
745 /// # assert_eq!(stream.next().await.unwrap(), w);
746 /// # }
747 /// # }));
748 /// # }
749 /// ```
750 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
751 self.filter_if_some(
752 other
753 .map(q!(|_| ()))
754 .into_singleton()
755 .filter(q!(|o| o.is_none())),
756 )
757 }
758
759 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
760 /// tupled pairs in a non-deterministic order.
761 ///
762 /// # Example
763 /// ```rust
764 /// # #[cfg(feature = "deploy")] {
765 /// # use hydro_lang::prelude::*;
766 /// # use std::collections::HashSet;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let tick = process.tick();
770 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
771 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
772 /// stream1.cross_product(stream2)
773 /// # }, |mut stream| async move {
774 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
775 /// # stream.map(|i| assert!(expected.contains(&i)));
776 /// # }));
777 /// # }
778 /// ```
779 pub fn cross_product<T2, O2: Ordering>(
780 self,
781 other: Stream<T2, L, B, O2, R>,
782 ) -> Stream<(T, T2), L, B, NoOrder, R>
783 where
784 T: Clone,
785 T2: Clone,
786 {
787 check_matching_location(&self.location, &other.location);
788
789 Stream::new(
790 self.location.clone(),
791 HydroNode::CrossProduct {
792 left: Box::new(self.ir_node.into_inner()),
793 right: Box::new(other.ir_node.into_inner()),
794 metadata: self
795 .location
796 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
797 },
798 )
799 }
800
801 /// Takes one stream as input and filters out any duplicate occurrences. The output
802 /// contains all unique values from the input.
803 ///
804 /// # Example
805 /// ```rust
806 /// # #[cfg(feature = "deploy")] {
807 /// # use hydro_lang::prelude::*;
808 /// # use futures::StreamExt;
809 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810 /// let tick = process.tick();
811 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
812 /// # }, |mut stream| async move {
813 /// # for w in vec![1, 2, 3, 4] {
814 /// # assert_eq!(stream.next().await.unwrap(), w);
815 /// # }
816 /// # }));
817 /// # }
818 /// ```
819 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
820 where
821 T: Eq + Hash,
822 {
823 Stream::new(
824 self.location.clone(),
825 HydroNode::Unique {
826 input: Box::new(self.ir_node.into_inner()),
827 metadata: self
828 .location
829 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
830 },
831 )
832 }
833
834 /// Outputs everything in this stream that is *not* contained in the `other` stream.
835 ///
836 /// The `other` stream must be [`Bounded`], since this function will wait until
837 /// all its elements are available before producing any output.
838 /// # Example
839 /// ```rust
840 /// # #[cfg(feature = "deploy")] {
841 /// # use hydro_lang::prelude::*;
842 /// # use futures::StreamExt;
843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844 /// let tick = process.tick();
845 /// let stream = process
846 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
847 /// .batch(&tick, nondet!(/** test */));
848 /// let batch = process
849 /// .source_iter(q!(vec![1, 2]))
850 /// .batch(&tick, nondet!(/** test */));
851 /// stream.filter_not_in(batch).all_ticks()
852 /// # }, |mut stream| async move {
853 /// # for w in vec![3, 4] {
854 /// # assert_eq!(stream.next().await.unwrap(), w);
855 /// # }
856 /// # }));
857 /// # }
858 /// ```
859 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
860 where
861 T: Eq + Hash,
862 {
863 check_matching_location(&self.location, &other.location);
864
865 Stream::new(
866 self.location.clone(),
867 HydroNode::Difference {
868 pos: Box::new(self.ir_node.into_inner()),
869 neg: Box::new(other.ir_node.into_inner()),
870 metadata: self
871 .location
872 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
873 },
874 )
875 }
876
877 /// An operator which allows you to "inspect" each element of a stream without
878 /// modifying it. The closure `f` is called on a reference to each item. This is
879 /// mainly useful for debugging, and should not be used to generate side-effects.
880 ///
881 /// # Example
882 /// ```rust
883 /// # #[cfg(feature = "deploy")] {
884 /// # use hydro_lang::prelude::*;
885 /// # use futures::StreamExt;
886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887 /// let nums = process.source_iter(q!(vec![1, 2]));
888 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
889 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
890 /// # }, |mut stream| async move {
891 /// # for w in vec![1, 2] {
892 /// # assert_eq!(stream.next().await.unwrap(), w);
893 /// # }
894 /// # }));
895 /// # }
896 /// ```
897 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
898 where
899 F: Fn(&T) + 'a,
900 {
901 let f = f.splice_fn1_borrow_ctx(&self.location).into();
902
903 Stream::new(
904 self.location.clone(),
905 HydroNode::Inspect {
906 f,
907 input: Box::new(self.ir_node.into_inner()),
908 metadata: self.location.new_node_metadata(Self::collection_kind()),
909 },
910 )
911 }
912
913 /// An operator which allows you to "name" a `HydroNode`.
914 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
915 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
916 {
917 let mut node = self.ir_node.borrow_mut();
918 let metadata = node.metadata_mut();
919 metadata.tag = Some(name.to_string());
920 }
921 self
922 }
923
924 /// Explicitly "casts" the stream to a type with a different ordering
925 /// guarantee. Useful in unsafe code where the ordering cannot be proven
926 /// by the type-system.
927 ///
928 /// # Non-Determinism
929 /// This function is used as an escape hatch, and any mistakes in the
930 /// provided ordering guarantee will propagate into the guarantees
931 /// for the rest of the program.
932 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933 if O::ORDERING_KIND == O2::ORDERING_KIND {
934 Stream::new(self.location, self.ir_node.into_inner())
935 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936 // We can always weaken the ordering guarantee
937 Stream::new(
938 self.location.clone(),
939 HydroNode::Cast {
940 inner: Box::new(self.ir_node.into_inner()),
941 metadata: self
942 .location
943 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944 },
945 )
946 } else {
947 Stream::new(
948 self.location.clone(),
949 HydroNode::ObserveNonDet {
950 inner: Box::new(self.ir_node.into_inner()),
951 trusted: false,
952 metadata: self
953 .location
954 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955 },
956 )
957 }
958 }
959
960 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
961 // is not observable
962 fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
963 if O::ORDERING_KIND == O2::ORDERING_KIND {
964 Stream::new(self.location, self.ir_node.into_inner())
965 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
966 // We can always weaken the ordering guarantee
967 Stream::new(
968 self.location.clone(),
969 HydroNode::Cast {
970 inner: Box::new(self.ir_node.into_inner()),
971 metadata: self
972 .location
973 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
974 },
975 )
976 } else {
977 Stream::new(
978 self.location.clone(),
979 HydroNode::ObserveNonDet {
980 inner: Box::new(self.ir_node.into_inner()),
981 trusted: true,
982 metadata: self
983 .location
984 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
985 },
986 )
987 }
988 }
989
990 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
991 /// which is always safe because that is the weakest possible guarantee.
992 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
993 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
994 self.assume_ordering::<NoOrder>(nondet)
995 }
996
997 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
998 /// enforcing that `O2` is weaker than the input ordering guarantee.
999 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1000 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1001 self.assume_ordering::<O2>(nondet)
1002 }
1003
1004 /// Explicitly "casts" the stream to a type with a different retries
1005 /// guarantee. Useful in unsafe code where the lack of retries cannot
1006 /// be proven by the type-system.
1007 ///
1008 /// # Non-Determinism
1009 /// This function is used as an escape hatch, and any mistakes in the
1010 /// provided retries guarantee will propagate into the guarantees
1011 /// for the rest of the program.
1012 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013 if R::RETRIES_KIND == R2::RETRIES_KIND {
1014 Stream::new(self.location, self.ir_node.into_inner())
1015 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016 // We can always weaken the retries guarantee
1017 Stream::new(
1018 self.location.clone(),
1019 HydroNode::Cast {
1020 inner: Box::new(self.ir_node.into_inner()),
1021 metadata: self
1022 .location
1023 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024 },
1025 )
1026 } else {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::ObserveNonDet {
1030 inner: Box::new(self.ir_node.into_inner()),
1031 trusted: false,
1032 metadata: self
1033 .location
1034 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035 },
1036 )
1037 }
1038 }
1039
1040 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1041 // is not observable
1042 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1043 if R::RETRIES_KIND == R2::RETRIES_KIND {
1044 Stream::new(self.location, self.ir_node.into_inner())
1045 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1046 // We can always weaken the retries guarantee
1047 Stream::new(
1048 self.location.clone(),
1049 HydroNode::Cast {
1050 inner: Box::new(self.ir_node.into_inner()),
1051 metadata: self
1052 .location
1053 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1054 },
1055 )
1056 } else {
1057 Stream::new(
1058 self.location.clone(),
1059 HydroNode::ObserveNonDet {
1060 inner: Box::new(self.ir_node.into_inner()),
1061 trusted: true,
1062 metadata: self
1063 .location
1064 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1065 },
1066 )
1067 }
1068 }
1069
1070 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1071 /// which is always safe because that is the weakest possible guarantee.
1072 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1073 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1074 self.assume_retries::<AtLeastOnce>(nondet)
1075 }
1076
1077 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1078 /// enforcing that `R2` is weaker than the input retries guarantee.
1079 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1080 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1081 self.assume_retries::<R2>(nondet)
1082 }
1083}
1084
1085impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1086where
1087 L: Location<'a>,
1088{
1089 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1090 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1091 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1092 self.assume_retries(
1093 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1094 )
1095 }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100 L: Location<'a>,
1101{
1102 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # #[cfg(feature = "deploy")] {
1107 /// # use hydro_lang::prelude::*;
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111 /// # }, |mut stream| async move {
1112 /// // 1, 2, 3
1113 /// # for w in vec![1, 2, 3] {
1114 /// # assert_eq!(stream.next().await.unwrap(), w);
1115 /// # }
1116 /// # }));
1117 /// # }
1118 /// ```
1119 pub fn cloned(self) -> Stream<T, L, B, O, R>
1120 where
1121 T: Clone,
1122 {
1123 self.map(q!(|d| d.clone()))
1124 }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129 L: Location<'a>,
1130{
1131 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134 ///
1135 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1136 /// and there may be duplicates.
1137 ///
1138 /// # Example
1139 /// ```rust
1140 /// # #[cfg(feature = "deploy")] {
1141 /// # use hydro_lang::prelude::*;
1142 /// # use futures::StreamExt;
1143 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144 /// let tick = process.tick();
1145 /// let bools = process.source_iter(q!(vec![false, true, false]));
1146 /// let batch = bools.batch(&tick, nondet!(/** test */));
1147 /// batch
1148 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1149 /// .all_ticks()
1150 /// # }, |mut stream| async move {
1151 /// // true
1152 /// # assert_eq!(stream.next().await.unwrap(), true);
1153 /// # }));
1154 /// # }
1155 /// ```
1156 pub fn fold_commutative_idempotent<A, I, F>(
1157 self,
1158 init: impl IntoQuotedMut<'a, I, L>,
1159 comb: impl IntoQuotedMut<'a, F, L>,
1160 ) -> Singleton<A, L, B>
1161 where
1162 I: Fn() -> A + 'a,
1163 F: Fn(&mut A, T),
1164 {
1165 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1166 self.assume_ordering(nondet)
1167 .assume_retries(nondet)
1168 .fold(init, comb)
1169 }
1170
1171 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1172 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1173 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1174 /// reference, so that it can be modified in place.
1175 ///
1176 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1177 /// and there may be duplicates.
1178 ///
1179 /// # Example
1180 /// ```rust
1181 /// # #[cfg(feature = "deploy")] {
1182 /// # use hydro_lang::prelude::*;
1183 /// # use futures::StreamExt;
1184 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185 /// let tick = process.tick();
1186 /// let bools = process.source_iter(q!(vec![false, true, false]));
1187 /// let batch = bools.batch(&tick, nondet!(/** test */));
1188 /// batch
1189 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1190 /// .all_ticks()
1191 /// # }, |mut stream| async move {
1192 /// // true
1193 /// # assert_eq!(stream.next().await.unwrap(), true);
1194 /// # }));
1195 /// # }
1196 /// ```
1197 pub fn reduce_commutative_idempotent<F>(
1198 self,
1199 comb: impl IntoQuotedMut<'a, F, L>,
1200 ) -> Optional<T, L, B>
1201 where
1202 F: Fn(&mut T, T) + 'a,
1203 {
1204 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205 self.assume_retries(nondet).reduce_commutative(comb)
1206 }
1207
1208 // only for internal APIs that have been carefully vetted, will eventually be removed once we
1209 // have algebraic verification of these properties
1210 fn reduce_commutative_idempotent_trusted<F>(
1211 self,
1212 comb: impl IntoQuotedMut<'a, F, L>,
1213 ) -> Optional<T, L, B>
1214 where
1215 F: Fn(&mut T, T) + 'a,
1216 {
1217 self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1218 .reduce_commutative_trusted(comb)
1219 }
1220
1221 /// Computes the maximum element in the stream as an [`Optional`], which
1222 /// will be empty until the first element in the input arrives.
1223 ///
1224 /// # Example
1225 /// ```rust
1226 /// # #[cfg(feature = "deploy")] {
1227 /// # use hydro_lang::prelude::*;
1228 /// # use futures::StreamExt;
1229 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230 /// let tick = process.tick();
1231 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1232 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233 /// batch.max().all_ticks()
1234 /// # }, |mut stream| async move {
1235 /// // 4
1236 /// # assert_eq!(stream.next().await.unwrap(), 4);
1237 /// # }));
1238 /// # }
1239 /// ```
1240 pub fn max(self) -> Optional<T, L, B>
1241 where
1242 T: Ord,
1243 {
1244 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1245 if new > *curr {
1246 *curr = new;
1247 }
1248 }))
1249 }
1250
1251 /// Computes the minimum element in the stream as an [`Optional`], which
1252 /// will be empty until the first element in the input arrives.
1253 ///
1254 /// # Example
1255 /// ```rust
1256 /// # #[cfg(feature = "deploy")] {
1257 /// # use hydro_lang::prelude::*;
1258 /// # use futures::StreamExt;
1259 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260 /// let tick = process.tick();
1261 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1262 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1263 /// batch.min().all_ticks()
1264 /// # }, |mut stream| async move {
1265 /// // 1
1266 /// # assert_eq!(stream.next().await.unwrap(), 1);
1267 /// # }));
1268 /// # }
1269 /// ```
1270 pub fn min(self) -> Optional<T, L, B>
1271 where
1272 T: Ord,
1273 {
1274 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1275 if new < *curr {
1276 *curr = new;
1277 }
1278 }))
1279 }
1280}
1281
1282impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1283where
1284 L: Location<'a>,
1285{
1286 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1287 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1288 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1289 ///
1290 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1291 ///
1292 /// # Example
1293 /// ```rust
1294 /// # #[cfg(feature = "deploy")] {
1295 /// # use hydro_lang::prelude::*;
1296 /// # use futures::StreamExt;
1297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298 /// let tick = process.tick();
1299 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1300 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301 /// batch
1302 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1303 /// .all_ticks()
1304 /// # }, |mut stream| async move {
1305 /// // 10
1306 /// # assert_eq!(stream.next().await.unwrap(), 10);
1307 /// # }));
1308 /// # }
1309 /// ```
1310 pub fn fold_commutative<A, I, F>(
1311 self,
1312 init: impl IntoQuotedMut<'a, I, L>,
1313 comb: impl IntoQuotedMut<'a, F, L>,
1314 ) -> Singleton<A, L, B>
1315 where
1316 I: Fn() -> A + 'a,
1317 F: Fn(&mut A, T),
1318 {
1319 let nondet = nondet!(/** the combinator function is commutative */);
1320 self.assume_ordering(nondet).fold(init, comb)
1321 }
1322
1323 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1324 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1325 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1326 /// reference, so that it can be modified in place.
1327 ///
1328 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1329 ///
1330 /// # Example
1331 /// ```rust
1332 /// # #[cfg(feature = "deploy")] {
1333 /// # use hydro_lang::prelude::*;
1334 /// # use futures::StreamExt;
1335 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336 /// let tick = process.tick();
1337 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1338 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1339 /// batch
1340 /// .reduce_commutative(q!(|curr, new| *curr += new))
1341 /// .all_ticks()
1342 /// # }, |mut stream| async move {
1343 /// // 10
1344 /// # assert_eq!(stream.next().await.unwrap(), 10);
1345 /// # }));
1346 /// # }
1347 /// ```
1348 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1349 where
1350 F: Fn(&mut T, T) + 'a,
1351 {
1352 let nondet = nondet!(/** the combinator function is commutative */);
1353 self.assume_ordering(nondet).reduce(comb)
1354 }
1355
1356 fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1357 where
1358 F: Fn(&mut T, T) + 'a,
1359 {
1360 let ordered = if B::BOUNDED {
1361 self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1362 } else {
1363 self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1364 };
1365
1366 ordered.reduce(comb)
1367 }
1368
1369 /// Computes the number of elements in the stream as a [`Singleton`].
1370 ///
1371 /// # Example
1372 /// ```rust
1373 /// # #[cfg(feature = "deploy")] {
1374 /// # use hydro_lang::prelude::*;
1375 /// # use futures::StreamExt;
1376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1377 /// let tick = process.tick();
1378 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1379 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1380 /// batch.count().all_ticks()
1381 /// # }, |mut stream| async move {
1382 /// // 4
1383 /// # assert_eq!(stream.next().await.unwrap(), 4);
1384 /// # }));
1385 /// # }
1386 /// ```
1387 pub fn count(self) -> Singleton<usize, L, B> {
1388 self.assume_ordering_trusted(nondet!(
1389 /// Order does not affect eventual count, and also does not affect intermediate states.
1390 ))
1391 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1392 }
1393}
1394
1395impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1396where
1397 L: Location<'a>,
1398{
1399 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1400 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1401 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1402 ///
1403 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1404 ///
1405 /// # Example
1406 /// ```rust
1407 /// # #[cfg(feature = "deploy")] {
1408 /// # use hydro_lang::prelude::*;
1409 /// # use futures::StreamExt;
1410 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1411 /// let tick = process.tick();
1412 /// let bools = process.source_iter(q!(vec![false, true, false]));
1413 /// let batch = bools.batch(&tick, nondet!(/** test */));
1414 /// batch
1415 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1416 /// .all_ticks()
1417 /// # }, |mut stream| async move {
1418 /// // true
1419 /// # assert_eq!(stream.next().await.unwrap(), true);
1420 /// # }));
1421 /// # }
1422 /// ```
1423 pub fn fold_idempotent<A, I, F>(
1424 self,
1425 init: impl IntoQuotedMut<'a, I, L>,
1426 comb: impl IntoQuotedMut<'a, F, L>,
1427 ) -> Singleton<A, L, B>
1428 where
1429 I: Fn() -> A + 'a,
1430 F: Fn(&mut A, T),
1431 {
1432 let nondet = nondet!(/** the combinator function is idempotent */);
1433 self.assume_retries(nondet).fold(init, comb)
1434 }
1435
1436 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1437 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1438 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1439 /// reference, so that it can be modified in place.
1440 ///
1441 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1442 ///
1443 /// # Example
1444 /// ```rust
1445 /// # #[cfg(feature = "deploy")] {
1446 /// # use hydro_lang::prelude::*;
1447 /// # use futures::StreamExt;
1448 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1449 /// let tick = process.tick();
1450 /// let bools = process.source_iter(q!(vec![false, true, false]));
1451 /// let batch = bools.batch(&tick, nondet!(/** test */));
1452 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1453 /// # }, |mut stream| async move {
1454 /// // true
1455 /// # assert_eq!(stream.next().await.unwrap(), true);
1456 /// # }));
1457 /// # }
1458 /// ```
1459 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1460 where
1461 F: Fn(&mut T, T) + 'a,
1462 {
1463 let nondet = nondet!(/** the combinator function is idempotent */);
1464 self.assume_retries(nondet).reduce(comb)
1465 }
1466
1467 /// Computes the first element in the stream as an [`Optional`], which
1468 /// will be empty until the first element in the input arrives.
1469 ///
1470 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1471 /// re-ordering of elements may cause the first element to change.
1472 ///
1473 /// # Example
1474 /// ```rust
1475 /// # #[cfg(feature = "deploy")] {
1476 /// # use hydro_lang::prelude::*;
1477 /// # use futures::StreamExt;
1478 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1479 /// let tick = process.tick();
1480 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1481 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482 /// batch.first().all_ticks()
1483 /// # }, |mut stream| async move {
1484 /// // 1
1485 /// # assert_eq!(stream.next().await.unwrap(), 1);
1486 /// # }));
1487 /// # }
1488 /// ```
1489 pub fn first(self) -> Optional<T, L, B> {
1490 self.reduce_idempotent(q!(|_, _| {}))
1491 }
1492
1493 /// Computes the last element in the stream as an [`Optional`], which
1494 /// will be empty until an element in the input arrives.
1495 ///
1496 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1497 /// re-ordering of elements may cause the last element to change.
1498 ///
1499 /// # Example
1500 /// ```rust
1501 /// # #[cfg(feature = "deploy")] {
1502 /// # use hydro_lang::prelude::*;
1503 /// # use futures::StreamExt;
1504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1505 /// let tick = process.tick();
1506 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1507 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1508 /// batch.last().all_ticks()
1509 /// # }, |mut stream| async move {
1510 /// // 4
1511 /// # assert_eq!(stream.next().await.unwrap(), 4);
1512 /// # }));
1513 /// # }
1514 /// ```
1515 pub fn last(self) -> Optional<T, L, B> {
1516 self.reduce_idempotent(q!(|curr, new| *curr = new))
1517 }
1518}
1519
1520impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1521where
1522 L: Location<'a>,
1523{
1524 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1525 ///
1526 /// # Example
1527 /// ```rust
1528 /// # #[cfg(feature = "deploy")] {
1529 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1530 /// # use futures::StreamExt;
1531 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1532 /// let tick = process.tick();
1533 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1534 /// numbers.enumerate()
1535 /// # }, |mut stream| async move {
1536 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1537 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1538 /// # assert_eq!(stream.next().await.unwrap(), w);
1539 /// # }
1540 /// # }));
1541 /// # }
1542 /// ```
1543 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1544 Stream::new(
1545 self.location.clone(),
1546 HydroNode::Enumerate {
1547 input: Box::new(self.ir_node.into_inner()),
1548 metadata: self.location.new_node_metadata(Stream::<
1549 (usize, T),
1550 L,
1551 B,
1552 TotalOrder,
1553 ExactlyOnce,
1554 >::collection_kind()),
1555 },
1556 )
1557 }
1558
1559 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1560 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1561 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1562 ///
1563 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1564 /// to depend on the order of elements in the stream.
1565 ///
1566 /// # Example
1567 /// ```rust
1568 /// # #[cfg(feature = "deploy")] {
1569 /// # use hydro_lang::prelude::*;
1570 /// # use futures::StreamExt;
1571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1572 /// let tick = process.tick();
1573 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1574 /// let batch = words.batch(&tick, nondet!(/** test */));
1575 /// batch
1576 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1577 /// .all_ticks()
1578 /// # }, |mut stream| async move {
1579 /// // "HELLOWORLD"
1580 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1581 /// # }));
1582 /// # }
1583 /// ```
1584 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1585 self,
1586 init: impl IntoQuotedMut<'a, I, L>,
1587 comb: impl IntoQuotedMut<'a, F, L>,
1588 ) -> Singleton<A, L, B> {
1589 let init = init.splice_fn0_ctx(&self.location).into();
1590 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1591
1592 let core = HydroNode::Fold {
1593 init,
1594 acc: comb,
1595 input: Box::new(self.ir_node.into_inner()),
1596 metadata: self
1597 .location
1598 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1599 };
1600
1601 Singleton::new(self.location, core)
1602 }
1603
1604 /// Collects all the elements of this stream into a single [`Vec`] element.
1605 ///
1606 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1607 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1608 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1609 /// the vector at an arbitrary point in time.
1610 ///
1611 /// # Example
1612 /// ```rust
1613 /// # #[cfg(feature = "deploy")] {
1614 /// # use hydro_lang::prelude::*;
1615 /// # use futures::StreamExt;
1616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617 /// let tick = process.tick();
1618 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1619 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1620 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1621 /// # }, |mut stream| async move {
1622 /// // [ vec![1, 2, 3, 4] ]
1623 /// # for w in vec![vec![1, 2, 3, 4]] {
1624 /// # assert_eq!(stream.next().await.unwrap(), w);
1625 /// # }
1626 /// # }));
1627 /// # }
1628 /// ```
1629 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1630 self.fold(
1631 q!(|| vec![]),
1632 q!(|acc, v| {
1633 acc.push(v);
1634 }),
1635 )
1636 }
1637
1638 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1639 /// and emitting each intermediate result.
1640 ///
1641 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1642 /// containing all intermediate accumulated values. The scan operation can also terminate early
1643 /// by returning `None`.
1644 ///
1645 /// The function takes a mutable reference to the accumulator and the current element, and returns
1646 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1647 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1648 ///
1649 /// # Examples
1650 ///
1651 /// Basic usage - running sum:
1652 /// ```rust
1653 /// # #[cfg(feature = "deploy")] {
1654 /// # use hydro_lang::prelude::*;
1655 /// # use futures::StreamExt;
1656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1658 /// q!(|| 0),
1659 /// q!(|acc, x| {
1660 /// *acc += x;
1661 /// Some(*acc)
1662 /// }),
1663 /// )
1664 /// # }, |mut stream| async move {
1665 /// // Output: 1, 3, 6, 10
1666 /// # for w in vec![1, 3, 6, 10] {
1667 /// # assert_eq!(stream.next().await.unwrap(), w);
1668 /// # }
1669 /// # }));
1670 /// # }
1671 /// ```
1672 ///
1673 /// Early termination example:
1674 /// ```rust
1675 /// # #[cfg(feature = "deploy")] {
1676 /// # use hydro_lang::prelude::*;
1677 /// # use futures::StreamExt;
1678 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1680 /// q!(|| 1),
1681 /// q!(|state, x| {
1682 /// *state = *state * x;
1683 /// if *state > 6 {
1684 /// None // Terminate the stream
1685 /// } else {
1686 /// Some(-*state)
1687 /// }
1688 /// }),
1689 /// )
1690 /// # }, |mut stream| async move {
1691 /// // Output: -1, -2, -6
1692 /// # for w in vec![-1, -2, -6] {
1693 /// # assert_eq!(stream.next().await.unwrap(), w);
1694 /// # }
1695 /// # }));
1696 /// # }
1697 /// ```
1698 pub fn scan<A, U, I, F>(
1699 self,
1700 init: impl IntoQuotedMut<'a, I, L>,
1701 f: impl IntoQuotedMut<'a, F, L>,
1702 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1703 where
1704 I: Fn() -> A + 'a,
1705 F: Fn(&mut A, T) -> Option<U> + 'a,
1706 {
1707 let init = init.splice_fn0_ctx(&self.location).into();
1708 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1709
1710 Stream::new(
1711 self.location.clone(),
1712 HydroNode::Scan {
1713 init,
1714 acc: f,
1715 input: Box::new(self.ir_node.into_inner()),
1716 metadata: self.location.new_node_metadata(
1717 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1718 ),
1719 },
1720 )
1721 }
1722
1723 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1724 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1725 /// until the first element in the input arrives.
1726 ///
1727 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1728 /// to depend on the order of elements in the stream.
1729 ///
1730 /// # Example
1731 /// ```rust
1732 /// # #[cfg(feature = "deploy")] {
1733 /// # use hydro_lang::prelude::*;
1734 /// # use futures::StreamExt;
1735 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736 /// let tick = process.tick();
1737 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1738 /// let batch = words.batch(&tick, nondet!(/** test */));
1739 /// batch
1740 /// .map(q!(|x| x.to_string()))
1741 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1742 /// .all_ticks()
1743 /// # }, |mut stream| async move {
1744 /// // "HELLOWORLD"
1745 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1746 /// # }));
1747 /// # }
1748 /// ```
1749 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1750 self,
1751 comb: impl IntoQuotedMut<'a, F, L>,
1752 ) -> Optional<T, L, B> {
1753 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1754 let core = HydroNode::Reduce {
1755 f,
1756 input: Box::new(self.ir_node.into_inner()),
1757 metadata: self
1758 .location
1759 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1760 };
1761
1762 Optional::new(self.location, core)
1763 }
1764}
1765
1766impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1767 /// Produces a new stream that interleaves the elements of the two input streams.
1768 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1769 ///
1770 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1771 /// [`Bounded`], you can use [`Stream::chain`] instead.
1772 ///
1773 /// # Example
1774 /// ```rust
1775 /// # #[cfg(feature = "deploy")] {
1776 /// # use hydro_lang::prelude::*;
1777 /// # use futures::StreamExt;
1778 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1779 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1780 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1781 /// # }, |mut stream| async move {
1782 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1783 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1784 /// # assert_eq!(stream.next().await.unwrap(), w);
1785 /// # }
1786 /// # }));
1787 /// # }
1788 /// ```
1789 pub fn interleave<O2: Ordering, R2: Retries>(
1790 self,
1791 other: Stream<T, L, Unbounded, O2, R2>,
1792 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1793 where
1794 R: MinRetries<R2>,
1795 {
1796 Stream::new(
1797 self.location.clone(),
1798 HydroNode::Chain {
1799 first: Box::new(self.ir_node.into_inner()),
1800 second: Box::new(other.ir_node.into_inner()),
1801 metadata: self.location.new_node_metadata(Stream::<
1802 T,
1803 L,
1804 Unbounded,
1805 NoOrder,
1806 <R as MinRetries<R2>>::Min,
1807 >::collection_kind()),
1808 },
1809 )
1810 }
1811}
1812
1813impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1814where
1815 L: Location<'a>,
1816{
1817 /// Produces a new stream that emits the input elements in sorted order.
1818 ///
1819 /// The input stream can have any ordering guarantee, but the output stream
1820 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1821 /// elements in the input stream are available, so it requires the input stream
1822 /// to be [`Bounded`].
1823 ///
1824 /// # Example
1825 /// ```rust
1826 /// # #[cfg(feature = "deploy")] {
1827 /// # use hydro_lang::prelude::*;
1828 /// # use futures::StreamExt;
1829 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1830 /// let tick = process.tick();
1831 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1832 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1833 /// batch.sort().all_ticks()
1834 /// # }, |mut stream| async move {
1835 /// // 1, 2, 3, 4
1836 /// # for w in (1..5) {
1837 /// # assert_eq!(stream.next().await.unwrap(), w);
1838 /// # }
1839 /// # }));
1840 /// # }
1841 /// ```
1842 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1843 where
1844 T: Ord,
1845 {
1846 Stream::new(
1847 self.location.clone(),
1848 HydroNode::Sort {
1849 input: Box::new(self.ir_node.into_inner()),
1850 metadata: self
1851 .location
1852 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1853 },
1854 )
1855 }
1856
1857 /// Produces a new stream that first emits the elements of the `self` stream,
1858 /// and then emits the elements of the `other` stream. The output stream has
1859 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1860 /// [`TotalOrder`] guarantee.
1861 ///
1862 /// Currently, both input streams must be [`Bounded`]. This operator will block
1863 /// on the first stream until all its elements are available. In a future version,
1864 /// we will relax the requirement on the `other` stream.
1865 ///
1866 /// # Example
1867 /// ```rust
1868 /// # #[cfg(feature = "deploy")] {
1869 /// # use hydro_lang::prelude::*;
1870 /// # use futures::StreamExt;
1871 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1872 /// let tick = process.tick();
1873 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1874 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1875 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1876 /// # }, |mut stream| async move {
1877 /// // 2, 3, 4, 5, 1, 2, 3, 4
1878 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1879 /// # assert_eq!(stream.next().await.unwrap(), w);
1880 /// # }
1881 /// # }));
1882 /// # }
1883 /// ```
1884 pub fn chain<O2: Ordering, R2: Retries>(
1885 self,
1886 other: Stream<T, L, Bounded, O2, R2>,
1887 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1888 where
1889 O: MinOrder<O2>,
1890 R: MinRetries<R2>,
1891 {
1892 check_matching_location(&self.location, &other.location);
1893
1894 Stream::new(
1895 self.location.clone(),
1896 HydroNode::Chain {
1897 first: Box::new(self.ir_node.into_inner()),
1898 second: Box::new(other.ir_node.into_inner()),
1899 metadata: self.location.new_node_metadata(Stream::<
1900 T,
1901 L,
1902 Bounded,
1903 <O as MinOrder<O2>>::Min,
1904 <R as MinRetries<R2>>::Min,
1905 >::collection_kind()),
1906 },
1907 )
1908 }
1909
1910 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1911 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1912 /// because this is compiled into a nested loop.
1913 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1914 self,
1915 other: Stream<T2, L, Bounded, O2, R>,
1916 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1917 where
1918 T: Clone,
1919 T2: Clone,
1920 {
1921 check_matching_location(&self.location, &other.location);
1922
1923 Stream::new(
1924 self.location.clone(),
1925 HydroNode::CrossProduct {
1926 left: Box::new(self.ir_node.into_inner()),
1927 right: Box::new(other.ir_node.into_inner()),
1928 metadata: self.location.new_node_metadata(Stream::<
1929 (T, T2),
1930 L,
1931 Bounded,
1932 <O2 as MinOrder<O>>::Min,
1933 R,
1934 >::collection_kind()),
1935 },
1936 )
1937 }
1938
1939 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1940 /// `self` used as the values for *each* key.
1941 ///
1942 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1943 /// values. For example, it can be used to send the same set of elements to several cluster
1944 /// members, if the membership information is available as a [`KeyedSingleton`].
1945 ///
1946 /// # Example
1947 /// ```rust
1948 /// # #[cfg(feature = "deploy")] {
1949 /// # use hydro_lang::prelude::*;
1950 /// # use futures::StreamExt;
1951 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1952 /// # let tick = process.tick();
1953 /// let keyed_singleton = // { 1: (), 2: () }
1954 /// # process
1955 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1956 /// # .into_keyed()
1957 /// # .batch(&tick, nondet!(/** test */))
1958 /// # .first();
1959 /// let stream = // [ "a", "b" ]
1960 /// # process
1961 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1962 /// # .batch(&tick, nondet!(/** test */));
1963 /// stream.repeat_with_keys(keyed_singleton)
1964 /// # .entries().all_ticks()
1965 /// # }, |mut stream| async move {
1966 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1967 /// # let mut results = Vec::new();
1968 /// # for _ in 0..4 {
1969 /// # results.push(stream.next().await.unwrap());
1970 /// # }
1971 /// # results.sort();
1972 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1973 /// # }));
1974 /// # }
1975 /// ```
1976 pub fn repeat_with_keys<K, V2>(
1977 self,
1978 keys: KeyedSingleton<K, V2, L, Bounded>,
1979 ) -> KeyedStream<K, T, L, Bounded, O, R>
1980 where
1981 K: Clone,
1982 T: Clone,
1983 {
1984 keys.keys()
1985 .weaken_retries()
1986 .assume_ordering_trusted::<TotalOrder>(
1987 nondet!(/** keyed stream does not depend on ordering of keys */),
1988 )
1989 .cross_product_nested_loop(self)
1990 .into_keyed()
1991 }
1992}
1993
1994impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1995where
1996 L: Location<'a>,
1997{
1998 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1999 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2000 /// by equi-joining the two streams on the key attribute `K`.
2001 ///
2002 /// # Example
2003 /// ```rust
2004 /// # #[cfg(feature = "deploy")] {
2005 /// # use hydro_lang::prelude::*;
2006 /// # use std::collections::HashSet;
2007 /// # use futures::StreamExt;
2008 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2009 /// let tick = process.tick();
2010 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2011 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2012 /// stream1.join(stream2)
2013 /// # }, |mut stream| async move {
2014 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2015 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2016 /// # stream.map(|i| assert!(expected.contains(&i)));
2017 /// # }));
2018 /// # }
2019 pub fn join<V2, O2: Ordering, R2: Retries>(
2020 self,
2021 n: Stream<(K, V2), L, B, O2, R2>,
2022 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2023 where
2024 K: Eq + Hash,
2025 R: MinRetries<R2>,
2026 {
2027 check_matching_location(&self.location, &n.location);
2028
2029 Stream::new(
2030 self.location.clone(),
2031 HydroNode::Join {
2032 left: Box::new(self.ir_node.into_inner()),
2033 right: Box::new(n.ir_node.into_inner()),
2034 metadata: self.location.new_node_metadata(Stream::<
2035 (K, (V1, V2)),
2036 L,
2037 B,
2038 NoOrder,
2039 <R as MinRetries<R2>>::Min,
2040 >::collection_kind()),
2041 },
2042 )
2043 }
2044
2045 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2046 /// computes the anti-join of the items in the input -- i.e. returns
2047 /// unique items in the first input that do not have a matching key
2048 /// in the second input.
2049 ///
2050 /// # Example
2051 /// ```rust
2052 /// # #[cfg(feature = "deploy")] {
2053 /// # use hydro_lang::prelude::*;
2054 /// # use futures::StreamExt;
2055 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2056 /// let tick = process.tick();
2057 /// let stream = process
2058 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2059 /// .batch(&tick, nondet!(/** test */));
2060 /// let batch = process
2061 /// .source_iter(q!(vec![1, 2]))
2062 /// .batch(&tick, nondet!(/** test */));
2063 /// stream.anti_join(batch).all_ticks()
2064 /// # }, |mut stream| async move {
2065 /// # for w in vec![(3, 'c'), (4, 'd')] {
2066 /// # assert_eq!(stream.next().await.unwrap(), w);
2067 /// # }
2068 /// # }));
2069 /// # }
2070 pub fn anti_join<O2: Ordering, R2: Retries>(
2071 self,
2072 n: Stream<K, L, Bounded, O2, R2>,
2073 ) -> Stream<(K, V1), L, B, O, R>
2074 where
2075 K: Eq + Hash,
2076 {
2077 check_matching_location(&self.location, &n.location);
2078
2079 Stream::new(
2080 self.location.clone(),
2081 HydroNode::AntiJoin {
2082 pos: Box::new(self.ir_node.into_inner()),
2083 neg: Box::new(n.ir_node.into_inner()),
2084 metadata: self
2085 .location
2086 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2087 },
2088 )
2089 }
2090}
2091
2092impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2093 Stream<(K, V), L, B, O, R>
2094{
2095 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2096 /// is used as the key and the second element is added to the entries associated with that key.
2097 ///
2098 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2099 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2100 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2101 /// total ordering _within_ each group but no ordering _across_ groups.
2102 ///
2103 /// # Example
2104 /// ```rust
2105 /// # #[cfg(feature = "deploy")] {
2106 /// # use hydro_lang::prelude::*;
2107 /// # use futures::StreamExt;
2108 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2109 /// process
2110 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2111 /// .into_keyed()
2112 /// # .entries()
2113 /// # }, |mut stream| async move {
2114 /// // { 1: [2, 3], 2: [4] }
2115 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2116 /// # assert_eq!(stream.next().await.unwrap(), w);
2117 /// # }
2118 /// # }));
2119 /// # }
2120 /// ```
2121 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2122 KeyedStream::new(
2123 self.location.clone(),
2124 HydroNode::Cast {
2125 inner: Box::new(self.ir_node.into_inner()),
2126 metadata: self
2127 .location
2128 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2129 },
2130 )
2131 }
2132}
2133
2134impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2135where
2136 K: Eq + Hash,
2137 L: Location<'a>,
2138{
2139 #[deprecated = "use .into_keyed().fold(...) instead"]
2140 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2141 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2142 /// in the second element are accumulated via the `comb` closure.
2143 ///
2144 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2145 /// to depend on the order of elements in the stream.
2146 ///
2147 /// If the input and output value types are the same and do not require initialization then use
2148 /// [`Stream::reduce_keyed`].
2149 ///
2150 /// # Example
2151 /// ```rust
2152 /// # #[cfg(feature = "deploy")] {
2153 /// # use hydro_lang::prelude::*;
2154 /// # use futures::StreamExt;
2155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2156 /// let tick = process.tick();
2157 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2158 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2159 /// batch
2160 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2161 /// .all_ticks()
2162 /// # }, |mut stream| async move {
2163 /// // (1, 5), (2, 7)
2164 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2165 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2166 /// # }));
2167 /// # }
2168 /// ```
2169 pub fn fold_keyed<A, I, F>(
2170 self,
2171 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2172 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2173 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2174 where
2175 I: Fn() -> A + 'a,
2176 F: Fn(&mut A, V) + 'a,
2177 {
2178 self.into_keyed().fold(init, comb).entries()
2179 }
2180
2181 #[deprecated = "use .into_keyed().reduce(...) instead"]
2182 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2183 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2184 /// in the second element are accumulated via the `comb` closure.
2185 ///
2186 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2187 /// to depend on the order of elements in the stream.
2188 ///
2189 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2190 ///
2191 /// # Example
2192 /// ```rust
2193 /// # #[cfg(feature = "deploy")] {
2194 /// # use hydro_lang::prelude::*;
2195 /// # use futures::StreamExt;
2196 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2197 /// let tick = process.tick();
2198 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2199 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2200 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2201 /// # }, |mut stream| async move {
2202 /// // (1, 5), (2, 7)
2203 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2204 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2205 /// # }));
2206 /// # }
2207 /// ```
2208 pub fn reduce_keyed<F>(
2209 self,
2210 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2211 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2212 where
2213 F: Fn(&mut V, V) + 'a,
2214 {
2215 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2216
2217 Stream::new(
2218 self.location.clone(),
2219 HydroNode::ReduceKeyed {
2220 f,
2221 input: Box::new(self.ir_node.into_inner()),
2222 metadata: self.location.new_node_metadata(Stream::<
2223 (K, V),
2224 Tick<L>,
2225 Bounded,
2226 NoOrder,
2227 ExactlyOnce,
2228 >::collection_kind()),
2229 },
2230 )
2231 }
2232}
2233
2234impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2235where
2236 K: Eq + Hash,
2237 L: Location<'a>,
2238{
2239 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2240 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2241 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2242 /// in the second element are accumulated via the `comb` closure.
2243 ///
2244 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2245 /// as there may be non-deterministic duplicates.
2246 ///
2247 /// If the input and output value types are the same and do not require initialization then use
2248 /// [`Stream::reduce_keyed_commutative_idempotent`].
2249 ///
2250 /// # Example
2251 /// ```rust
2252 /// # #[cfg(feature = "deploy")] {
2253 /// # use hydro_lang::prelude::*;
2254 /// # use futures::StreamExt;
2255 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2256 /// let tick = process.tick();
2257 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2258 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2259 /// batch
2260 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2261 /// .all_ticks()
2262 /// # }, |mut stream| async move {
2263 /// // (1, false), (2, true)
2264 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2265 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2266 /// # }));
2267 /// # }
2268 /// ```
2269 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2270 self,
2271 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2272 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2273 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2274 where
2275 I: Fn() -> A + 'a,
2276 F: Fn(&mut A, V) + 'a,
2277 {
2278 self.into_keyed()
2279 .fold_commutative_idempotent(init, comb)
2280 .entries()
2281 }
2282
2283 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2284 /// # Example
2285 /// ```rust
2286 /// # #[cfg(feature = "deploy")] {
2287 /// # use hydro_lang::prelude::*;
2288 /// # use futures::StreamExt;
2289 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2290 /// let tick = process.tick();
2291 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2292 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2293 /// batch.keys().all_ticks()
2294 /// # }, |mut stream| async move {
2295 /// // 1, 2
2296 /// # assert_eq!(stream.next().await.unwrap(), 1);
2297 /// # assert_eq!(stream.next().await.unwrap(), 2);
2298 /// # }));
2299 /// # }
2300 /// ```
2301 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2302 self.into_keyed()
2303 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2304 .keys()
2305 }
2306
2307 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2308 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2309 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2310 /// in the second element are accumulated via the `comb` closure.
2311 ///
2312 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2313 /// as there may be non-deterministic duplicates.
2314 ///
2315 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2316 ///
2317 /// # Example
2318 /// ```rust
2319 /// # #[cfg(feature = "deploy")] {
2320 /// # use hydro_lang::prelude::*;
2321 /// # use futures::StreamExt;
2322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2323 /// let tick = process.tick();
2324 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2325 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2326 /// batch
2327 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2328 /// .all_ticks()
2329 /// # }, |mut stream| async move {
2330 /// // (1, false), (2, true)
2331 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2332 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2333 /// # }));
2334 /// # }
2335 /// ```
2336 pub fn reduce_keyed_commutative_idempotent<F>(
2337 self,
2338 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2339 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2340 where
2341 F: Fn(&mut V, V) + 'a,
2342 {
2343 self.into_keyed()
2344 .reduce_commutative_idempotent(comb)
2345 .entries()
2346 }
2347}
2348
2349impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2350where
2351 K: Eq + Hash,
2352 L: Location<'a>,
2353{
2354 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2355 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2356 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2357 /// in the second element are accumulated via the `comb` closure.
2358 ///
2359 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2360 ///
2361 /// If the input and output value types are the same and do not require initialization then use
2362 /// [`Stream::reduce_keyed_commutative`].
2363 ///
2364 /// # Example
2365 /// ```rust
2366 /// # #[cfg(feature = "deploy")] {
2367 /// # use hydro_lang::prelude::*;
2368 /// # use futures::StreamExt;
2369 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2370 /// let tick = process.tick();
2371 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2372 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2373 /// batch
2374 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2375 /// .all_ticks()
2376 /// # }, |mut stream| async move {
2377 /// // (1, 5), (2, 7)
2378 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2379 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2380 /// # }));
2381 /// # }
2382 /// ```
2383 pub fn fold_keyed_commutative<A, I, F>(
2384 self,
2385 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2386 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2387 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2388 where
2389 I: Fn() -> A + 'a,
2390 F: Fn(&mut A, V) + 'a,
2391 {
2392 self.into_keyed().fold_commutative(init, comb).entries()
2393 }
2394
2395 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2396 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2397 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2398 /// in the second element are accumulated via the `comb` closure.
2399 ///
2400 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2401 ///
2402 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2403 ///
2404 /// # Example
2405 /// ```rust
2406 /// # #[cfg(feature = "deploy")] {
2407 /// # use hydro_lang::prelude::*;
2408 /// # use futures::StreamExt;
2409 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2410 /// let tick = process.tick();
2411 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2412 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2413 /// batch
2414 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2415 /// .all_ticks()
2416 /// # }, |mut stream| async move {
2417 /// // (1, 5), (2, 7)
2418 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2419 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2420 /// # }));
2421 /// # }
2422 /// ```
2423 pub fn reduce_keyed_commutative<F>(
2424 self,
2425 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2426 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2427 where
2428 F: Fn(&mut V, V) + 'a,
2429 {
2430 self.into_keyed().reduce_commutative(comb).entries()
2431 }
2432}
2433
2434impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2435where
2436 K: Eq + Hash,
2437 L: Location<'a>,
2438{
2439 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2440 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2441 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2442 /// in the second element are accumulated via the `comb` closure.
2443 ///
2444 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2445 ///
2446 /// If the input and output value types are the same and do not require initialization then use
2447 /// [`Stream::reduce_keyed_idempotent`].
2448 ///
2449 /// # Example
2450 /// ```rust
2451 /// # #[cfg(feature = "deploy")] {
2452 /// # use hydro_lang::prelude::*;
2453 /// # use futures::StreamExt;
2454 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2455 /// let tick = process.tick();
2456 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2457 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2458 /// batch
2459 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2460 /// .all_ticks()
2461 /// # }, |mut stream| async move {
2462 /// // (1, false), (2, true)
2463 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2464 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2465 /// # }));
2466 /// # }
2467 /// ```
2468 pub fn fold_keyed_idempotent<A, I, F>(
2469 self,
2470 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2471 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2472 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2473 where
2474 I: Fn() -> A + 'a,
2475 F: Fn(&mut A, V) + 'a,
2476 {
2477 self.into_keyed().fold_idempotent(init, comb).entries()
2478 }
2479
2480 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2481 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2482 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2483 /// in the second element are accumulated via the `comb` closure.
2484 ///
2485 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2486 ///
2487 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2488 ///
2489 /// # Example
2490 /// ```rust
2491 /// # #[cfg(feature = "deploy")] {
2492 /// # use hydro_lang::prelude::*;
2493 /// # use futures::StreamExt;
2494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2495 /// let tick = process.tick();
2496 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2497 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2498 /// batch
2499 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2500 /// .all_ticks()
2501 /// # }, |mut stream| async move {
2502 /// // (1, false), (2, true)
2503 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2504 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2505 /// # }));
2506 /// # }
2507 /// ```
2508 pub fn reduce_keyed_idempotent<F>(
2509 self,
2510 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2511 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2512 where
2513 F: Fn(&mut V, V) + 'a,
2514 {
2515 self.into_keyed().reduce_idempotent(comb).entries()
2516 }
2517}
2518
2519impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2520where
2521 L: Location<'a> + NoTick,
2522{
2523 /// Returns a stream corresponding to the latest batch of elements being atomically
2524 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2525 /// the order of the input.
2526 ///
2527 /// # Non-Determinism
2528 /// The batch boundaries are non-deterministic and may change across executions.
2529 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2530 Stream::new(
2531 self.location.clone().tick,
2532 HydroNode::Batch {
2533 inner: Box::new(self.ir_node.into_inner()),
2534 metadata: self
2535 .location
2536 .tick
2537 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2538 },
2539 )
2540 }
2541
2542 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2543 /// See [`Stream::atomic`] for more details.
2544 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2545 Stream::new(
2546 self.location.tick.l.clone(),
2547 HydroNode::EndAtomic {
2548 inner: Box::new(self.ir_node.into_inner()),
2549 metadata: self
2550 .location
2551 .tick
2552 .l
2553 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2554 },
2555 )
2556 }
2557}
2558
2559impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2560where
2561 L: Location<'a>,
2562{
2563 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2564 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2565 ///
2566 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2567 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2568 /// argument that declares where the stream will be atomically processed. Batching a stream into
2569 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2570 /// [`Tick`] will introduce asynchrony.
2571 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2572 let out_location = Atomic { tick: tick.clone() };
2573 Stream::new(
2574 out_location.clone(),
2575 HydroNode::BeginAtomic {
2576 inner: Box::new(self.ir_node.into_inner()),
2577 metadata: out_location
2578 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2579 },
2580 )
2581 }
2582
2583 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2584 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2585 /// the order of the input. The output stream will execute in the [`Tick`] that was
2586 /// used to create the atomic section.
2587 ///
2588 /// # Non-Determinism
2589 /// The batch boundaries are non-deterministic and may change across executions.
2590 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2591 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2592 Stream::new(
2593 tick.clone(),
2594 HydroNode::Batch {
2595 inner: Box::new(self.ir_node.into_inner()),
2596 metadata: tick
2597 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2598 },
2599 )
2600 }
2601
2602 /// Given a time interval, returns a stream corresponding to samples taken from the
2603 /// stream roughly at that interval. The output will have elements in the same order
2604 /// as the input, but with arbitrary elements skipped between samples. There is also
2605 /// no guarantee on the exact timing of the samples.
2606 ///
2607 /// # Non-Determinism
2608 /// The output stream is non-deterministic in which elements are sampled, since this
2609 /// is controlled by a clock.
2610 pub fn sample_every(
2611 self,
2612 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2613 nondet: NonDet,
2614 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2615 where
2616 L: NoTick + NoAtomic,
2617 {
2618 let samples = self.location.source_interval(interval, nondet);
2619
2620 let tick = self.location.tick();
2621 self.batch(&tick, nondet)
2622 .filter_if_some(samples.batch(&tick, nondet).first())
2623 .all_ticks()
2624 .weakest_retries()
2625 }
2626
2627 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2628 /// stream has not emitted a value since that duration.
2629 ///
2630 /// # Non-Determinism
2631 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2632 /// samples take place, timeouts may be non-deterministically generated or missed,
2633 /// and the notification of the timeout may be delayed as well. There is also no
2634 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2635 /// detected based on when the next sample is taken.
2636 pub fn timeout(
2637 self,
2638 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2639 nondet: NonDet,
2640 ) -> Optional<(), L, Unbounded>
2641 where
2642 L: NoTick + NoAtomic,
2643 {
2644 let tick = self.location.tick();
2645
2646 let latest_received = self.assume_retries(nondet).fold_commutative(
2647 q!(|| None),
2648 q!(|latest, _| {
2649 *latest = Some(Instant::now());
2650 }),
2651 );
2652
2653 latest_received
2654 .snapshot(&tick, nondet)
2655 .filter_map(q!(move |latest_received| {
2656 if let Some(latest_received) = latest_received {
2657 if Instant::now().duration_since(latest_received) > duration {
2658 Some(())
2659 } else {
2660 None
2661 }
2662 } else {
2663 Some(())
2664 }
2665 }))
2666 .latest()
2667 }
2668}
2669
2670impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2671where
2672 L: Location<'a> + NoTick + NoAtomic,
2673 F: Future<Output = T>,
2674{
2675 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2676 /// Future outputs are produced as available, regardless of input arrival order.
2677 ///
2678 /// # Example
2679 /// ```rust
2680 /// # #[cfg(feature = "deploy")] {
2681 /// # use std::collections::HashSet;
2682 /// # use futures::StreamExt;
2683 /// # use hydro_lang::prelude::*;
2684 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2685 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2686 /// .map(q!(|x| async move {
2687 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2688 /// x
2689 /// }))
2690 /// .resolve_futures()
2691 /// # },
2692 /// # |mut stream| async move {
2693 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2694 /// # let mut output = HashSet::new();
2695 /// # for _ in 1..10 {
2696 /// # output.insert(stream.next().await.unwrap());
2697 /// # }
2698 /// # assert_eq!(
2699 /// # output,
2700 /// # HashSet::<i32>::from_iter(1..10)
2701 /// # );
2702 /// # },
2703 /// # ));
2704 /// # }
2705 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2706 Stream::new(
2707 self.location.clone(),
2708 HydroNode::ResolveFutures {
2709 input: Box::new(self.ir_node.into_inner()),
2710 metadata: self
2711 .location
2712 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2713 },
2714 )
2715 }
2716
2717 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2718 /// Future outputs are produced in the same order as the input stream.
2719 ///
2720 /// # Example
2721 /// ```rust
2722 /// # #[cfg(feature = "deploy")] {
2723 /// # use std::collections::HashSet;
2724 /// # use futures::StreamExt;
2725 /// # use hydro_lang::prelude::*;
2726 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2727 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2728 /// .map(q!(|x| async move {
2729 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2730 /// x
2731 /// }))
2732 /// .resolve_futures_ordered()
2733 /// # },
2734 /// # |mut stream| async move {
2735 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2736 /// # let mut output = Vec::new();
2737 /// # for _ in 1..10 {
2738 /// # output.push(stream.next().await.unwrap());
2739 /// # }
2740 /// # assert_eq!(
2741 /// # output,
2742 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2743 /// # );
2744 /// # },
2745 /// # ));
2746 /// # }
2747 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2748 Stream::new(
2749 self.location.clone(),
2750 HydroNode::ResolveFuturesOrdered {
2751 input: Box::new(self.ir_node.into_inner()),
2752 metadata: self
2753 .location
2754 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2755 },
2756 )
2757 }
2758}
2759
2760impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2761where
2762 L: Location<'a> + NoTick,
2763{
2764 /// Executes the provided closure for every element in this stream.
2765 ///
2766 /// Because the closure may have side effects, the stream must have deterministic order
2767 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2768 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2769 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2770 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2771 let f = f.splice_fn1_ctx(&self.location).into();
2772 self.location
2773 .flow_state()
2774 .borrow_mut()
2775 .push_root(HydroRoot::ForEach {
2776 input: Box::new(self.ir_node.into_inner()),
2777 f,
2778 op_metadata: HydroIrOpMetadata::new(),
2779 });
2780 }
2781
2782 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2783 /// TCP socket to some other server. You should _not_ use this API for interacting with
2784 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2785 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2786 /// interaction with asynchronous sinks.
2787 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2788 where
2789 S: 'a + futures::Sink<T> + Unpin,
2790 {
2791 self.location
2792 .flow_state()
2793 .borrow_mut()
2794 .push_root(HydroRoot::DestSink {
2795 sink: sink.splice_typed_ctx(&self.location).into(),
2796 input: Box::new(self.ir_node.into_inner()),
2797 op_metadata: HydroIrOpMetadata::new(),
2798 });
2799 }
2800}
2801
2802impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2803where
2804 L: Location<'a>,
2805{
2806 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2807 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2808 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2809 Stream::new(
2810 self.location.outer().clone(),
2811 HydroNode::YieldConcat {
2812 inner: Box::new(self.ir_node.into_inner()),
2813 metadata: self
2814 .location
2815 .outer()
2816 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2817 },
2818 )
2819 }
2820
2821 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2822 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2823 ///
2824 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2825 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2826 /// stream's [`Tick`] context.
2827 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2828 let out_location = Atomic {
2829 tick: self.location.clone(),
2830 };
2831
2832 Stream::new(
2833 out_location.clone(),
2834 HydroNode::YieldConcat {
2835 inner: Box::new(self.ir_node.into_inner()),
2836 metadata: out_location
2837 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2838 },
2839 )
2840 }
2841
2842 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2843 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2844 /// input.
2845 ///
2846 /// This API is particularly useful for stateful computation on batches of data, such as
2847 /// maintaining an accumulated state that is up to date with the current batch.
2848 ///
2849 /// # Example
2850 /// ```rust
2851 /// # #[cfg(feature = "deploy")] {
2852 /// # use hydro_lang::prelude::*;
2853 /// # use futures::StreamExt;
2854 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2855 /// let tick = process.tick();
2856 /// # // ticks are lazy by default, forces the second tick to run
2857 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2858 /// # let batch_first_tick = process
2859 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2860 /// # .batch(&tick, nondet!(/** test */));
2861 /// # let batch_second_tick = process
2862 /// # .source_iter(q!(vec![5, 6, 7]))
2863 /// # .batch(&tick, nondet!(/** test */))
2864 /// # .defer_tick(); // appears on the second tick
2865 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2866 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2867 ///
2868 /// input.batch(&tick, nondet!(/** test */))
2869 /// .across_ticks(|s| s.count()).all_ticks()
2870 /// # }, |mut stream| async move {
2871 /// // [4, 7]
2872 /// assert_eq!(stream.next().await.unwrap(), 4);
2873 /// assert_eq!(stream.next().await.unwrap(), 7);
2874 /// # }));
2875 /// # }
2876 /// ```
2877 pub fn across_ticks<Out: BatchAtomic>(
2878 self,
2879 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2880 ) -> Out::Batched {
2881 thunk(self.all_ticks_atomic()).batched_atomic()
2882 }
2883
2884 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2885 /// always has the elements of `self` at tick `T - 1`.
2886 ///
2887 /// At tick `0`, the output stream is empty, since there is no previous tick.
2888 ///
2889 /// This operator enables stateful iterative processing with ticks, by sending data from one
2890 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2891 ///
2892 /// # Example
2893 /// ```rust
2894 /// # #[cfg(feature = "deploy")] {
2895 /// # use hydro_lang::prelude::*;
2896 /// # use futures::StreamExt;
2897 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2898 /// let tick = process.tick();
2899 /// // ticks are lazy by default, forces the second tick to run
2900 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2901 ///
2902 /// let batch_first_tick = process
2903 /// .source_iter(q!(vec![1, 2, 3, 4]))
2904 /// .batch(&tick, nondet!(/** test */));
2905 /// let batch_second_tick = process
2906 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2907 /// .batch(&tick, nondet!(/** test */))
2908 /// .defer_tick(); // appears on the second tick
2909 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2910 ///
2911 /// changes_across_ticks.clone().filter_not_in(
2912 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2913 /// ).all_ticks()
2914 /// # }, |mut stream| async move {
2915 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2916 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2917 /// # assert_eq!(stream.next().await.unwrap(), w);
2918 /// # }
2919 /// # }));
2920 /// # }
2921 /// ```
2922 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2923 Stream::new(
2924 self.location.clone(),
2925 HydroNode::DeferTick {
2926 input: Box::new(self.ir_node.into_inner()),
2927 metadata: self
2928 .location
2929 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2930 },
2931 )
2932 }
2933}
2934
2935#[cfg(test)]
2936mod tests {
2937 #[cfg(feature = "deploy")]
2938 use futures::{SinkExt, StreamExt};
2939 #[cfg(feature = "deploy")]
2940 use hydro_deploy::Deployment;
2941 #[cfg(feature = "deploy")]
2942 use serde::{Deserialize, Serialize};
2943 #[cfg(feature = "deploy")]
2944 use stageleft::q;
2945
2946 #[cfg(any(feature = "deploy", feature = "sim"))]
2947 use crate::compile::builder::FlowBuilder;
2948 #[cfg(feature = "deploy")]
2949 use crate::live_collections::stream::ExactlyOnce;
2950 #[cfg(feature = "sim")]
2951 use crate::live_collections::stream::NoOrder;
2952 #[cfg(any(feature = "deploy", feature = "sim"))]
2953 use crate::live_collections::stream::TotalOrder;
2954 #[cfg(any(feature = "deploy", feature = "sim"))]
2955 use crate::location::Location;
2956 #[cfg(any(feature = "deploy", feature = "sim"))]
2957 use crate::nondet::nondet;
2958
2959 mod backtrace_chained_ops;
2960
2961 #[cfg(feature = "deploy")]
2962 struct P1 {}
2963 #[cfg(feature = "deploy")]
2964 struct P2 {}
2965
2966 #[cfg(feature = "deploy")]
2967 #[derive(Serialize, Deserialize, Debug)]
2968 struct SendOverNetwork {
2969 n: u32,
2970 }
2971
2972 #[cfg(feature = "deploy")]
2973 #[tokio::test]
2974 async fn first_ten_distributed() {
2975 let mut deployment = Deployment::new();
2976
2977 let flow = FlowBuilder::new();
2978 let first_node = flow.process::<P1>();
2979 let second_node = flow.process::<P2>();
2980 let external = flow.external::<P2>();
2981
2982 let numbers = first_node.source_iter(q!(0..10));
2983 let out_port = numbers
2984 .map(q!(|n| SendOverNetwork { n }))
2985 .send_bincode(&second_node)
2986 .send_bincode_external(&external);
2987
2988 let nodes = flow
2989 .with_process(&first_node, deployment.Localhost())
2990 .with_process(&second_node, deployment.Localhost())
2991 .with_external(&external, deployment.Localhost())
2992 .deploy(&mut deployment);
2993
2994 deployment.deploy().await.unwrap();
2995
2996 let mut external_out = nodes.connect(out_port).await;
2997
2998 deployment.start().await.unwrap();
2999
3000 for i in 0..10 {
3001 assert_eq!(external_out.next().await.unwrap().n, i);
3002 }
3003 }
3004
3005 #[cfg(feature = "deploy")]
3006 #[tokio::test]
3007 async fn first_cardinality() {
3008 let mut deployment = Deployment::new();
3009
3010 let flow = FlowBuilder::new();
3011 let node = flow.process::<()>();
3012 let external = flow.external::<()>();
3013
3014 let node_tick = node.tick();
3015 let count = node_tick
3016 .singleton(q!([1, 2, 3]))
3017 .into_stream()
3018 .flatten_ordered()
3019 .first()
3020 .into_stream()
3021 .count()
3022 .all_ticks()
3023 .send_bincode_external(&external);
3024
3025 let nodes = flow
3026 .with_process(&node, deployment.Localhost())
3027 .with_external(&external, deployment.Localhost())
3028 .deploy(&mut deployment);
3029
3030 deployment.deploy().await.unwrap();
3031
3032 let mut external_out = nodes.connect(count).await;
3033
3034 deployment.start().await.unwrap();
3035
3036 assert_eq!(external_out.next().await.unwrap(), 1);
3037 }
3038
3039 #[cfg(feature = "deploy")]
3040 #[tokio::test]
3041 async fn unbounded_reduce_remembers_state() {
3042 let mut deployment = Deployment::new();
3043
3044 let flow = FlowBuilder::new();
3045 let node = flow.process::<()>();
3046 let external = flow.external::<()>();
3047
3048 let (input_port, input) = node.source_external_bincode(&external);
3049 let out = input
3050 .reduce(q!(|acc, v| *acc += v))
3051 .sample_eager(nondet!(/** test */))
3052 .send_bincode_external(&external);
3053
3054 let nodes = flow
3055 .with_process(&node, deployment.Localhost())
3056 .with_external(&external, deployment.Localhost())
3057 .deploy(&mut deployment);
3058
3059 deployment.deploy().await.unwrap();
3060
3061 let mut external_in = nodes.connect(input_port).await;
3062 let mut external_out = nodes.connect(out).await;
3063
3064 deployment.start().await.unwrap();
3065
3066 external_in.send(1).await.unwrap();
3067 assert_eq!(external_out.next().await.unwrap(), 1);
3068
3069 external_in.send(2).await.unwrap();
3070 assert_eq!(external_out.next().await.unwrap(), 3);
3071 }
3072
3073 #[cfg(feature = "deploy")]
3074 #[tokio::test]
3075 async fn atomic_fold_replays_each_tick() {
3076 let mut deployment = Deployment::new();
3077
3078 let flow = FlowBuilder::new();
3079 let node = flow.process::<()>();
3080 let external = flow.external::<()>();
3081
3082 let (input_port, input) =
3083 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3084 let tick = node.tick();
3085
3086 let out = input
3087 .batch(&tick, nondet!(/** test */))
3088 .cross_singleton(
3089 node.source_iter(q!(vec![1, 2, 3]))
3090 .atomic(&tick)
3091 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3092 .snapshot_atomic(nondet!(/** test */)),
3093 )
3094 .all_ticks()
3095 .send_bincode_external(&external);
3096
3097 let nodes = flow
3098 .with_process(&node, deployment.Localhost())
3099 .with_external(&external, deployment.Localhost())
3100 .deploy(&mut deployment);
3101
3102 deployment.deploy().await.unwrap();
3103
3104 let mut external_in = nodes.connect(input_port).await;
3105 let mut external_out = nodes.connect(out).await;
3106
3107 deployment.start().await.unwrap();
3108
3109 external_in.send(1).await.unwrap();
3110 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3111
3112 external_in.send(2).await.unwrap();
3113 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3114 }
3115
3116 #[cfg(feature = "deploy")]
3117 #[tokio::test]
3118 async fn unbounded_scan_remembers_state() {
3119 let mut deployment = Deployment::new();
3120
3121 let flow = FlowBuilder::new();
3122 let node = flow.process::<()>();
3123 let external = flow.external::<()>();
3124
3125 let (input_port, input) = node.source_external_bincode(&external);
3126 let out = input
3127 .scan(
3128 q!(|| 0),
3129 q!(|acc, v| {
3130 *acc += v;
3131 Some(*acc)
3132 }),
3133 )
3134 .send_bincode_external(&external);
3135
3136 let nodes = flow
3137 .with_process(&node, deployment.Localhost())
3138 .with_external(&external, deployment.Localhost())
3139 .deploy(&mut deployment);
3140
3141 deployment.deploy().await.unwrap();
3142
3143 let mut external_in = nodes.connect(input_port).await;
3144 let mut external_out = nodes.connect(out).await;
3145
3146 deployment.start().await.unwrap();
3147
3148 external_in.send(1).await.unwrap();
3149 assert_eq!(external_out.next().await.unwrap(), 1);
3150
3151 external_in.send(2).await.unwrap();
3152 assert_eq!(external_out.next().await.unwrap(), 3);
3153 }
3154
3155 #[cfg(feature = "deploy")]
3156 #[tokio::test]
3157 async fn unbounded_enumerate_remembers_state() {
3158 let mut deployment = Deployment::new();
3159
3160 let flow = FlowBuilder::new();
3161 let node = flow.process::<()>();
3162 let external = flow.external::<()>();
3163
3164 let (input_port, input) = node.source_external_bincode(&external);
3165 let out = input.enumerate().send_bincode_external(&external);
3166
3167 let nodes = flow
3168 .with_process(&node, deployment.Localhost())
3169 .with_external(&external, deployment.Localhost())
3170 .deploy(&mut deployment);
3171
3172 deployment.deploy().await.unwrap();
3173
3174 let mut external_in = nodes.connect(input_port).await;
3175 let mut external_out = nodes.connect(out).await;
3176
3177 deployment.start().await.unwrap();
3178
3179 external_in.send(1).await.unwrap();
3180 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3181
3182 external_in.send(2).await.unwrap();
3183 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3184 }
3185
3186 #[cfg(feature = "deploy")]
3187 #[tokio::test]
3188 async fn unbounded_unique_remembers_state() {
3189 let mut deployment = Deployment::new();
3190
3191 let flow = FlowBuilder::new();
3192 let node = flow.process::<()>();
3193 let external = flow.external::<()>();
3194
3195 let (input_port, input) =
3196 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3197 let out = input.unique().send_bincode_external(&external);
3198
3199 let nodes = flow
3200 .with_process(&node, deployment.Localhost())
3201 .with_external(&external, deployment.Localhost())
3202 .deploy(&mut deployment);
3203
3204 deployment.deploy().await.unwrap();
3205
3206 let mut external_in = nodes.connect(input_port).await;
3207 let mut external_out = nodes.connect(out).await;
3208
3209 deployment.start().await.unwrap();
3210
3211 external_in.send(1).await.unwrap();
3212 assert_eq!(external_out.next().await.unwrap(), 1);
3213
3214 external_in.send(2).await.unwrap();
3215 assert_eq!(external_out.next().await.unwrap(), 2);
3216
3217 external_in.send(1).await.unwrap();
3218 external_in.send(3).await.unwrap();
3219 assert_eq!(external_out.next().await.unwrap(), 3);
3220 }
3221
3222 #[cfg(feature = "sim")]
3223 #[test]
3224 #[should_panic]
3225 fn sim_batch_nondet_size() {
3226 let flow = FlowBuilder::new();
3227 let node = flow.process::<()>();
3228
3229 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3230
3231 let tick = node.tick();
3232 let out_recv = input
3233 .batch(&tick, nondet!(/** test */))
3234 .count()
3235 .all_ticks()
3236 .sim_output();
3237
3238 flow.sim().exhaustive(async || {
3239 in_send.send(());
3240 in_send.send(());
3241 in_send.send(());
3242
3243 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3244 });
3245 }
3246
3247 #[cfg(feature = "sim")]
3248 #[test]
3249 fn sim_batch_preserves_order() {
3250 let flow = FlowBuilder::new();
3251 let node = flow.process::<()>();
3252
3253 let (in_send, input) = node.sim_input();
3254
3255 let tick = node.tick();
3256 let out_recv = input
3257 .batch(&tick, nondet!(/** test */))
3258 .all_ticks()
3259 .sim_output();
3260
3261 flow.sim().exhaustive(async || {
3262 in_send.send(1);
3263 in_send.send(2);
3264 in_send.send(3);
3265
3266 out_recv.assert_yields_only([1, 2, 3]).await;
3267 });
3268 }
3269
3270 #[cfg(feature = "sim")]
3271 #[test]
3272 #[should_panic]
3273 fn sim_batch_unordered_shuffles() {
3274 let flow = FlowBuilder::new();
3275 let node = flow.process::<()>();
3276
3277 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3278
3279 let tick = node.tick();
3280 let batch = input.batch(&tick, nondet!(/** test */));
3281 let out_recv = batch
3282 .clone()
3283 .min()
3284 .zip(batch.max())
3285 .all_ticks()
3286 .sim_output();
3287
3288 flow.sim().exhaustive(async || {
3289 in_send.send_many_unordered([1, 2, 3]);
3290
3291 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3292 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3293 }
3294 });
3295 }
3296
3297 #[cfg(feature = "sim")]
3298 #[test]
3299 fn sim_batch_unordered_shuffles_count() {
3300 let flow = FlowBuilder::new();
3301 let node = flow.process::<()>();
3302
3303 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3304
3305 let tick = node.tick();
3306 let batch = input.batch(&tick, nondet!(/** test */));
3307 let out_recv = batch.all_ticks().sim_output();
3308
3309 let instance_count = flow.sim().exhaustive(async || {
3310 in_send.send_many_unordered([1, 2, 3, 4]);
3311 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3312 });
3313
3314 assert_eq!(
3315 instance_count,
3316 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3317 )
3318 }
3319
3320 #[cfg(feature = "sim")]
3321 #[test]
3322 #[should_panic]
3323 fn sim_observe_order_batched() {
3324 let flow = FlowBuilder::new();
3325 let node = flow.process::<()>();
3326
3327 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3328
3329 let tick = node.tick();
3330 let batch = input.batch(&tick, nondet!(/** test */));
3331 let out_recv = batch
3332 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3333 .all_ticks()
3334 .sim_output();
3335
3336 flow.sim().exhaustive(async || {
3337 in_send.send_many_unordered([1, 2, 3, 4]);
3338 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3339 });
3340 }
3341
3342 #[cfg(feature = "sim")]
3343 #[test]
3344 fn sim_observe_order_batched_count() {
3345 let flow = FlowBuilder::new();
3346 let node = flow.process::<()>();
3347
3348 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3349
3350 let tick = node.tick();
3351 let batch = input.batch(&tick, nondet!(/** test */));
3352 let out_recv = batch
3353 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3354 .all_ticks()
3355 .sim_output();
3356
3357 let instance_count = flow.sim().exhaustive(async || {
3358 in_send.send_many_unordered([1, 2, 3, 4]);
3359 let _ = out_recv.collect::<Vec<_>>().await;
3360 });
3361
3362 assert_eq!(
3363 instance_count,
3364 192 // 4! * 2^{4 - 1}
3365 )
3366 }
3367
3368 #[cfg(feature = "sim")]
3369 #[test]
3370 fn sim_unordered_count_instance_count() {
3371 let flow = FlowBuilder::new();
3372 let node = flow.process::<()>();
3373
3374 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3375
3376 let tick = node.tick();
3377 let out_recv = input
3378 .count()
3379 .snapshot(&tick, nondet!(/** test */))
3380 .all_ticks()
3381 .sim_output();
3382
3383 let instance_count = flow.sim().exhaustive(async || {
3384 in_send.send_many_unordered([1, 2, 3, 4]);
3385 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3386 });
3387
3388 assert_eq!(
3389 instance_count,
3390 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3391 )
3392 }
3393}