hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30
31pub mod networking;
32
33/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
34#[sealed::sealed]
35pub trait Ordering:
36    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
37{
38    /// The [`StreamOrder`] corresponding to this type.
39    const ORDERING_KIND: StreamOrder;
40}
41
42/// Marks the stream as being totally ordered, which means that there are
43/// no sources of non-determinism (other than intentional ones) that will
44/// affect the order of elements.
45pub enum TotalOrder {}
46
47#[sealed::sealed]
48impl Ordering for TotalOrder {
49    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
50}
51
52/// Marks the stream as having no order, which means that the order of
53/// elements may be affected by non-determinism.
54///
55/// This restricts certain operators, such as `fold` and `reduce`, to only
56/// be used with commutative aggregation functions.
57pub enum NoOrder {}
58
59#[sealed::sealed]
60impl Ordering for NoOrder {
61    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
62}
63
64/// Helper trait for determining the weakest of two orderings.
65#[sealed::sealed]
66pub trait MinOrder<Other: ?Sized> {
67    /// The weaker of the two orderings.
68    type Min: Ordering;
69}
70
71#[sealed::sealed]
72impl MinOrder<NoOrder> for TotalOrder {
73    type Min = NoOrder;
74}
75
76#[sealed::sealed]
77impl MinOrder<TotalOrder> for TotalOrder {
78    type Min = TotalOrder;
79}
80
81#[sealed::sealed]
82impl MinOrder<TotalOrder> for NoOrder {
83    type Min = NoOrder;
84}
85
86#[sealed::sealed]
87impl MinOrder<NoOrder> for NoOrder {
88    type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94    MinRetries<Self, Min = Self>
95    + MinRetries<ExactlyOnce, Min = Self>
96    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98    /// The [`StreamRetry`] corresponding to this type.
99    const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Helper trait for determining the weakest of two retry guarantees.
121#[sealed::sealed]
122pub trait MinRetries<Other: ?Sized> {
123    /// The weaker of the two retry guarantees.
124    type Min: Retries;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for ExactlyOnce {
129    type Min = AtLeastOnce;
130}
131
132#[sealed::sealed]
133impl MinRetries<ExactlyOnce> for ExactlyOnce {
134    type Min = ExactlyOnce;
135}
136
137#[sealed::sealed]
138impl MinRetries<ExactlyOnce> for AtLeastOnce {
139    type Min = AtLeastOnce;
140}
141
142#[sealed::sealed]
143impl MinRetries<AtLeastOnce> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147/// Streaming sequence of elements with type `Type`.
148///
149/// This live collection represents a growing sequence of elements, with new elements being
150/// asynchronously appended to the end of the sequence. This can be used to model the arrival
151/// of network input, such as API requests, or streaming ingestion.
152///
153/// By default, all streams have deterministic ordering and each element is materialized exactly
154/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
155/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
156/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
157///
158/// Type Parameters:
159/// - `Type`: the type of elements in the stream
160/// - `Loc`: the location where the stream is being materialized
161/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
162/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
163///   (default is [`TotalOrder`])
164/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
165///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
166pub struct Stream<
167    Type,
168    Loc,
169    Bound: Boundedness = Unbounded,
170    Order: Ordering = TotalOrder,
171    Retry: Retries = ExactlyOnce,
172> {
173    pub(crate) location: Loc,
174    pub(crate) ir_node: RefCell<HydroNode>,
175
176    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
177}
178
179impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
180    for Stream<T, L, Unbounded, O, R>
181where
182    L: Location<'a>,
183{
184    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
185        Stream {
186            location: stream.location,
187            ir_node: stream.ir_node,
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
194    for Stream<T, L, B, NoOrder, R>
195where
196    L: Location<'a>,
197{
198    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
199        Stream {
200            location: stream.location,
201            ir_node: stream.ir_node,
202            _phantom: PhantomData,
203        }
204    }
205}
206
207impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
208    for Stream<T, L, B, O, AtLeastOnce>
209where
210    L: Location<'a>,
211{
212    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
213        Stream {
214            location: stream.location,
215            ir_node: stream.ir_node,
216            _phantom: PhantomData,
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
222where
223    L: Location<'a>,
224{
225    fn defer_tick(self) -> Self {
226        Stream::defer_tick(self)
227    }
228}
229
230impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
231    for Stream<T, Tick<L>, Bounded, O, R>
232where
233    L: Location<'a>,
234{
235    type Location = Tick<L>;
236
237    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
238        Stream::new(
239            location.clone(),
240            HydroNode::CycleSource {
241                ident,
242                metadata: location.new_node_metadata(Self::collection_kind()),
243            },
244        )
245    }
246}
247
248impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
249    for Stream<T, Tick<L>, Bounded, O, R>
250where
251    L: Location<'a>,
252{
253    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
254        assert_eq!(
255            Location::id(&self.location),
256            expected_location,
257            "locations do not match"
258        );
259        self.location
260            .flow_state()
261            .borrow_mut()
262            .push_root(HydroRoot::CycleSink {
263                ident,
264                input: Box::new(self.ir_node.into_inner()),
265                op_metadata: HydroIrOpMetadata::new(),
266            });
267    }
268}
269
270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
271    for Stream<T, L, B, O, R>
272where
273    L: Location<'a> + NoTick,
274{
275    type Location = L;
276
277    fn create_source(ident: syn::Ident, location: L) -> Self {
278        Stream::new(
279            location.clone(),
280            HydroNode::CycleSource {
281                ident,
282                metadata: location.new_node_metadata(Self::collection_kind()),
283            },
284        )
285    }
286}
287
288impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
289    for Stream<T, L, B, O, R>
290where
291    L: Location<'a> + NoTick,
292{
293    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
294        assert_eq!(
295            Location::id(&self.location),
296            expected_location,
297            "locations do not match"
298        );
299        self.location
300            .flow_state()
301            .borrow_mut()
302            .push_root(HydroRoot::CycleSink {
303                ident,
304                input: Box::new(self.ir_node.into_inner()),
305                op_metadata: HydroIrOpMetadata::new(),
306            });
307    }
308}
309
310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
311where
312    T: Clone,
313    L: Location<'a>,
314{
315    fn clone(&self) -> Self {
316        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
317            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
318            *self.ir_node.borrow_mut() = HydroNode::Tee {
319                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
320                metadata: self.location.new_node_metadata(Self::collection_kind()),
321            };
322        }
323
324        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
325            Stream {
326                location: self.location.clone(),
327                ir_node: HydroNode::Tee {
328                    inner: TeeNode(inner.0.clone()),
329                    metadata: metadata.clone(),
330                }
331                .into(),
332                _phantom: PhantomData,
333            }
334        } else {
335            unreachable!()
336        }
337    }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
341where
342    L: Location<'a>,
343{
344    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
345        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
346        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
347
348        Stream {
349            location,
350            ir_node: RefCell::new(ir_node),
351            _phantom: PhantomData,
352        }
353    }
354
355    /// Returns the [`Location`] where this stream is being materialized.
356    pub fn location(&self) -> &L {
357        &self.location
358    }
359
360    pub(crate) fn collection_kind() -> CollectionKind {
361        CollectionKind::Stream {
362            bound: B::BOUND_KIND,
363            order: O::ORDERING_KIND,
364            retry: R::RETRIES_KIND,
365            element_type: stageleft::quote_type::<T>().into(),
366        }
367    }
368
369    /// Produces a stream based on invoking `f` on each element.
370    /// If you do not want to modify the stream and instead only want to view
371    /// each item use [`Stream::inspect`] instead.
372    ///
373    /// # Example
374    /// ```rust
375    /// # #[cfg(feature = "deploy")] {
376    /// # use hydro_lang::prelude::*;
377    /// # use futures::StreamExt;
378    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
379    /// let words = process.source_iter(q!(vec!["hello", "world"]));
380    /// words.map(q!(|x| x.to_uppercase()))
381    /// # }, |mut stream| async move {
382    /// # for w in vec!["HELLO", "WORLD"] {
383    /// #     assert_eq!(stream.next().await.unwrap(), w);
384    /// # }
385    /// # }));
386    /// # }
387    /// ```
388    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
389    where
390        F: Fn(T) -> U + 'a,
391    {
392        let f = f.splice_fn1_ctx(&self.location).into();
393        Stream::new(
394            self.location.clone(),
395            HydroNode::Map {
396                f,
397                input: Box::new(self.ir_node.into_inner()),
398                metadata: self
399                    .location
400                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
401            },
402        )
403    }
404
405    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
406    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
407    /// for the output type `U` must produce items in a **deterministic** order.
408    ///
409    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
410    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
411    ///
412    /// # Example
413    /// ```rust
414    /// # #[cfg(feature = "deploy")] {
415    /// # use hydro_lang::prelude::*;
416    /// # use futures::StreamExt;
417    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418    /// process
419    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
420    ///     .flat_map_ordered(q!(|x| x))
421    /// # }, |mut stream| async move {
422    /// // 1, 2, 3, 4
423    /// # for w in (1..5) {
424    /// #     assert_eq!(stream.next().await.unwrap(), w);
425    /// # }
426    /// # }));
427    /// # }
428    /// ```
429    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
430    where
431        I: IntoIterator<Item = U>,
432        F: Fn(T) -> I + 'a,
433    {
434        let f = f.splice_fn1_ctx(&self.location).into();
435        Stream::new(
436            self.location.clone(),
437            HydroNode::FlatMap {
438                f,
439                input: Box::new(self.ir_node.into_inner()),
440                metadata: self
441                    .location
442                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
443            },
444        )
445    }
446
447    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
448    /// for the output type `U` to produce items in any order.
449    ///
450    /// # Example
451    /// ```rust
452    /// # #[cfg(feature = "deploy")] {
453    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
454    /// # use futures::StreamExt;
455    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
456    /// process
457    ///     .source_iter(q!(vec![
458    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
459    ///         std::collections::HashSet::from_iter(vec![3, 4]),
460    ///     ]))
461    ///     .flat_map_unordered(q!(|x| x))
462    /// # }, |mut stream| async move {
463    /// // 1, 2, 3, 4, but in no particular order
464    /// # let mut results = Vec::new();
465    /// # for w in (1..5) {
466    /// #     results.push(stream.next().await.unwrap());
467    /// # }
468    /// # results.sort();
469    /// # assert_eq!(results, vec![1, 2, 3, 4]);
470    /// # }));
471    /// # }
472    /// ```
473    pub fn flat_map_unordered<U, I, F>(
474        self,
475        f: impl IntoQuotedMut<'a, F, L>,
476    ) -> Stream<U, L, B, NoOrder, R>
477    where
478        I: IntoIterator<Item = U>,
479        F: Fn(T) -> I + 'a,
480    {
481        let f = f.splice_fn1_ctx(&self.location).into();
482        Stream::new(
483            self.location.clone(),
484            HydroNode::FlatMap {
485                f,
486                input: Box::new(self.ir_node.into_inner()),
487                metadata: self
488                    .location
489                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
490            },
491        )
492    }
493
494    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
495    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
496    ///
497    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
498    /// not deterministic, use [`Stream::flatten_unordered`] instead.
499    ///
500    /// ```rust
501    /// # #[cfg(feature = "deploy")] {
502    /// # use hydro_lang::prelude::*;
503    /// # use futures::StreamExt;
504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505    /// process
506    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
507    ///     .flatten_ordered()
508    /// # }, |mut stream| async move {
509    /// // 1, 2, 3, 4
510    /// # for w in (1..5) {
511    /// #     assert_eq!(stream.next().await.unwrap(), w);
512    /// # }
513    /// # }));
514    /// # }
515    /// ```
516    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
517    where
518        T: IntoIterator<Item = U>,
519    {
520        self.flat_map_ordered(q!(|d| d))
521    }
522
523    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
524    /// for the element type `T` to produce items in any order.
525    ///
526    /// # Example
527    /// ```rust
528    /// # #[cfg(feature = "deploy")] {
529    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
530    /// # use futures::StreamExt;
531    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
532    /// process
533    ///     .source_iter(q!(vec![
534    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
535    ///         std::collections::HashSet::from_iter(vec![3, 4]),
536    ///     ]))
537    ///     .flatten_unordered()
538    /// # }, |mut stream| async move {
539    /// // 1, 2, 3, 4, but in no particular order
540    /// # let mut results = Vec::new();
541    /// # for w in (1..5) {
542    /// #     results.push(stream.next().await.unwrap());
543    /// # }
544    /// # results.sort();
545    /// # assert_eq!(results, vec![1, 2, 3, 4]);
546    /// # }));
547    /// # }
548    /// ```
549    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
550    where
551        T: IntoIterator<Item = U>,
552    {
553        self.flat_map_unordered(q!(|d| d))
554    }
555
556    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
557    /// `f`, preserving the order of the elements.
558    ///
559    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
560    /// not modify or take ownership of the values. If you need to modify the values while filtering
561    /// use [`Stream::filter_map`] instead.
562    ///
563    /// # Example
564    /// ```rust
565    /// # #[cfg(feature = "deploy")] {
566    /// # use hydro_lang::prelude::*;
567    /// # use futures::StreamExt;
568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569    /// process
570    ///     .source_iter(q!(vec![1, 2, 3, 4]))
571    ///     .filter(q!(|&x| x > 2))
572    /// # }, |mut stream| async move {
573    /// // 3, 4
574    /// # for w in (3..5) {
575    /// #     assert_eq!(stream.next().await.unwrap(), w);
576    /// # }
577    /// # }));
578    /// # }
579    /// ```
580    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
581    where
582        F: Fn(&T) -> bool + 'a,
583    {
584        let f = f.splice_fn1_borrow_ctx(&self.location).into();
585        Stream::new(
586            self.location.clone(),
587            HydroNode::Filter {
588                f,
589                input: Box::new(self.ir_node.into_inner()),
590                metadata: self.location.new_node_metadata(Self::collection_kind()),
591            },
592        )
593    }
594
595    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
596    ///
597    /// # Example
598    /// ```rust
599    /// # #[cfg(feature = "deploy")] {
600    /// # use hydro_lang::prelude::*;
601    /// # use futures::StreamExt;
602    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
603    /// process
604    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
605    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
606    /// # }, |mut stream| async move {
607    /// // 1, 2
608    /// # for w in (1..3) {
609    /// #     assert_eq!(stream.next().await.unwrap(), w);
610    /// # }
611    /// # }));
612    /// # }
613    /// ```
614    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
615    where
616        F: Fn(T) -> Option<U> + 'a,
617    {
618        let f = f.splice_fn1_ctx(&self.location).into();
619        Stream::new(
620            self.location.clone(),
621            HydroNode::FilterMap {
622                f,
623                input: Box::new(self.ir_node.into_inner()),
624                metadata: self
625                    .location
626                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
627            },
628        )
629    }
630
631    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
632    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
633    /// If `other` is an empty [`Optional`], no values will be produced.
634    ///
635    /// # Example
636    /// ```rust
637    /// # #[cfg(feature = "deploy")] {
638    /// # use hydro_lang::prelude::*;
639    /// # use futures::StreamExt;
640    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
641    /// let tick = process.tick();
642    /// let batch = process
643    ///   .source_iter(q!(vec![1, 2, 3, 4]))
644    ///   .batch(&tick, nondet!(/** test */));
645    /// let count = batch.clone().count(); // `count()` returns a singleton
646    /// batch.cross_singleton(count).all_ticks()
647    /// # }, |mut stream| async move {
648    /// // (1, 4), (2, 4), (3, 4), (4, 4)
649    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
650    /// #     assert_eq!(stream.next().await.unwrap(), w);
651    /// # }
652    /// # }));
653    /// # }
654    /// ```
655    pub fn cross_singleton<O2>(
656        self,
657        other: impl Into<Optional<O2, L, Bounded>>,
658    ) -> Stream<(T, O2), L, B, O, R>
659    where
660        O2: Clone,
661    {
662        let other: Optional<O2, L, Bounded> = other.into();
663        check_matching_location(&self.location, &other.location);
664
665        Stream::new(
666            self.location.clone(),
667            HydroNode::CrossSingleton {
668                left: Box::new(self.ir_node.into_inner()),
669                right: Box::new(other.ir_node.into_inner()),
670                metadata: self
671                    .location
672                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
673            },
674        )
675    }
676
677    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
678    ///
679    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
680    /// leader of a cluster.
681    ///
682    /// # Example
683    /// ```rust
684    /// # #[cfg(feature = "deploy")] {
685    /// # use hydro_lang::prelude::*;
686    /// # use futures::StreamExt;
687    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688    /// let tick = process.tick();
689    /// // ticks are lazy by default, forces the second tick to run
690    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
691    ///
692    /// let batch_first_tick = process
693    ///   .source_iter(q!(vec![1, 2, 3, 4]))
694    ///   .batch(&tick, nondet!(/** test */));
695    /// let batch_second_tick = process
696    ///   .source_iter(q!(vec![5, 6, 7, 8]))
697    ///   .batch(&tick, nondet!(/** test */))
698    ///   .defer_tick(); // appears on the second tick
699    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
700    /// batch_first_tick.chain(batch_second_tick)
701    ///   .filter_if_some(some_on_first_tick)
702    ///   .all_ticks()
703    /// # }, |mut stream| async move {
704    /// // [1, 2, 3, 4]
705    /// # for w in vec![1, 2, 3, 4] {
706    /// #     assert_eq!(stream.next().await.unwrap(), w);
707    /// # }
708    /// # }));
709    /// # }
710    /// ```
711    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
712        self.cross_singleton(signal.map(q!(|_u| ())))
713            .map(q!(|(d, _signal)| d))
714    }
715
716    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
717    ///
718    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
719    /// some local state.
720    ///
721    /// # Example
722    /// ```rust
723    /// # #[cfg(feature = "deploy")] {
724    /// # use hydro_lang::prelude::*;
725    /// # use futures::StreamExt;
726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727    /// let tick = process.tick();
728    /// // ticks are lazy by default, forces the second tick to run
729    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730    ///
731    /// let batch_first_tick = process
732    ///   .source_iter(q!(vec![1, 2, 3, 4]))
733    ///   .batch(&tick, nondet!(/** test */));
734    /// let batch_second_tick = process
735    ///   .source_iter(q!(vec![5, 6, 7, 8]))
736    ///   .batch(&tick, nondet!(/** test */))
737    ///   .defer_tick(); // appears on the second tick
738    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
739    /// batch_first_tick.chain(batch_second_tick)
740    ///   .filter_if_none(some_on_first_tick)
741    ///   .all_ticks()
742    /// # }, |mut stream| async move {
743    /// // [5, 6, 7, 8]
744    /// # for w in vec![5, 6, 7, 8] {
745    /// #     assert_eq!(stream.next().await.unwrap(), w);
746    /// # }
747    /// # }));
748    /// # }
749    /// ```
750    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
751        self.filter_if_some(
752            other
753                .map(q!(|_| ()))
754                .into_singleton()
755                .filter(q!(|o| o.is_none())),
756        )
757    }
758
759    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
760    /// tupled pairs in a non-deterministic order.
761    ///
762    /// # Example
763    /// ```rust
764    /// # #[cfg(feature = "deploy")] {
765    /// # use hydro_lang::prelude::*;
766    /// # use std::collections::HashSet;
767    /// # use futures::StreamExt;
768    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769    /// let tick = process.tick();
770    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
771    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
772    /// stream1.cross_product(stream2)
773    /// # }, |mut stream| async move {
774    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
775    /// # stream.map(|i| assert!(expected.contains(&i)));
776    /// # }));
777    /// # }
778    /// ```
779    pub fn cross_product<T2, O2: Ordering>(
780        self,
781        other: Stream<T2, L, B, O2, R>,
782    ) -> Stream<(T, T2), L, B, NoOrder, R>
783    where
784        T: Clone,
785        T2: Clone,
786    {
787        check_matching_location(&self.location, &other.location);
788
789        Stream::new(
790            self.location.clone(),
791            HydroNode::CrossProduct {
792                left: Box::new(self.ir_node.into_inner()),
793                right: Box::new(other.ir_node.into_inner()),
794                metadata: self
795                    .location
796                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
797            },
798        )
799    }
800
801    /// Takes one stream as input and filters out any duplicate occurrences. The output
802    /// contains all unique values from the input.
803    ///
804    /// # Example
805    /// ```rust
806    /// # #[cfg(feature = "deploy")] {
807    /// # use hydro_lang::prelude::*;
808    /// # use futures::StreamExt;
809    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810    /// let tick = process.tick();
811    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
812    /// # }, |mut stream| async move {
813    /// # for w in vec![1, 2, 3, 4] {
814    /// #     assert_eq!(stream.next().await.unwrap(), w);
815    /// # }
816    /// # }));
817    /// # }
818    /// ```
819    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
820    where
821        T: Eq + Hash,
822    {
823        Stream::new(
824            self.location.clone(),
825            HydroNode::Unique {
826                input: Box::new(self.ir_node.into_inner()),
827                metadata: self
828                    .location
829                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
830            },
831        )
832    }
833
834    /// Outputs everything in this stream that is *not* contained in the `other` stream.
835    ///
836    /// The `other` stream must be [`Bounded`], since this function will wait until
837    /// all its elements are available before producing any output.
838    /// # Example
839    /// ```rust
840    /// # #[cfg(feature = "deploy")] {
841    /// # use hydro_lang::prelude::*;
842    /// # use futures::StreamExt;
843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844    /// let tick = process.tick();
845    /// let stream = process
846    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
847    ///   .batch(&tick, nondet!(/** test */));
848    /// let batch = process
849    ///   .source_iter(q!(vec![1, 2]))
850    ///   .batch(&tick, nondet!(/** test */));
851    /// stream.filter_not_in(batch).all_ticks()
852    /// # }, |mut stream| async move {
853    /// # for w in vec![3, 4] {
854    /// #     assert_eq!(stream.next().await.unwrap(), w);
855    /// # }
856    /// # }));
857    /// # }
858    /// ```
859    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
860    where
861        T: Eq + Hash,
862    {
863        check_matching_location(&self.location, &other.location);
864
865        Stream::new(
866            self.location.clone(),
867            HydroNode::Difference {
868                pos: Box::new(self.ir_node.into_inner()),
869                neg: Box::new(other.ir_node.into_inner()),
870                metadata: self
871                    .location
872                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
873            },
874        )
875    }
876
877    /// An operator which allows you to "inspect" each element of a stream without
878    /// modifying it. The closure `f` is called on a reference to each item. This is
879    /// mainly useful for debugging, and should not be used to generate side-effects.
880    ///
881    /// # Example
882    /// ```rust
883    /// # #[cfg(feature = "deploy")] {
884    /// # use hydro_lang::prelude::*;
885    /// # use futures::StreamExt;
886    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887    /// let nums = process.source_iter(q!(vec![1, 2]));
888    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
889    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
890    /// # }, |mut stream| async move {
891    /// # for w in vec![1, 2] {
892    /// #     assert_eq!(stream.next().await.unwrap(), w);
893    /// # }
894    /// # }));
895    /// # }
896    /// ```
897    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
898    where
899        F: Fn(&T) + 'a,
900    {
901        let f = f.splice_fn1_borrow_ctx(&self.location).into();
902
903        Stream::new(
904            self.location.clone(),
905            HydroNode::Inspect {
906                f,
907                input: Box::new(self.ir_node.into_inner()),
908                metadata: self.location.new_node_metadata(Self::collection_kind()),
909            },
910        )
911    }
912
913    /// An operator which allows you to "name" a `HydroNode`.
914    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
915    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
916        {
917            let mut node = self.ir_node.borrow_mut();
918            let metadata = node.metadata_mut();
919            metadata.tag = Some(name.to_string());
920        }
921        self
922    }
923
924    /// Explicitly "casts" the stream to a type with a different ordering
925    /// guarantee. Useful in unsafe code where the ordering cannot be proven
926    /// by the type-system.
927    ///
928    /// # Non-Determinism
929    /// This function is used as an escape hatch, and any mistakes in the
930    /// provided ordering guarantee will propagate into the guarantees
931    /// for the rest of the program.
932    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933        if O::ORDERING_KIND == O2::ORDERING_KIND {
934            Stream::new(self.location, self.ir_node.into_inner())
935        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936            // We can always weaken the ordering guarantee
937            Stream::new(
938                self.location.clone(),
939                HydroNode::Cast {
940                    inner: Box::new(self.ir_node.into_inner()),
941                    metadata: self
942                        .location
943                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944                },
945            )
946        } else {
947            Stream::new(
948                self.location.clone(),
949                HydroNode::ObserveNonDet {
950                    inner: Box::new(self.ir_node.into_inner()),
951                    trusted: false,
952                    metadata: self
953                        .location
954                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955                },
956            )
957        }
958    }
959
960    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
961    // is not observable
962    fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
963        if O::ORDERING_KIND == O2::ORDERING_KIND {
964            Stream::new(self.location, self.ir_node.into_inner())
965        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
966            // We can always weaken the ordering guarantee
967            Stream::new(
968                self.location.clone(),
969                HydroNode::Cast {
970                    inner: Box::new(self.ir_node.into_inner()),
971                    metadata: self
972                        .location
973                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
974                },
975            )
976        } else {
977            Stream::new(
978                self.location.clone(),
979                HydroNode::ObserveNonDet {
980                    inner: Box::new(self.ir_node.into_inner()),
981                    trusted: true,
982                    metadata: self
983                        .location
984                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
985                },
986            )
987        }
988    }
989
990    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
991    /// which is always safe because that is the weakest possible guarantee.
992    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
993        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
994        self.assume_ordering::<NoOrder>(nondet)
995    }
996
997    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
998    /// enforcing that `O2` is weaker than the input ordering guarantee.
999    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1000        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1001        self.assume_ordering::<O2>(nondet)
1002    }
1003
1004    /// Explicitly "casts" the stream to a type with a different retries
1005    /// guarantee. Useful in unsafe code where the lack of retries cannot
1006    /// be proven by the type-system.
1007    ///
1008    /// # Non-Determinism
1009    /// This function is used as an escape hatch, and any mistakes in the
1010    /// provided retries guarantee will propagate into the guarantees
1011    /// for the rest of the program.
1012    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013        if R::RETRIES_KIND == R2::RETRIES_KIND {
1014            Stream::new(self.location, self.ir_node.into_inner())
1015        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016            // We can always weaken the retries guarantee
1017            Stream::new(
1018                self.location.clone(),
1019                HydroNode::Cast {
1020                    inner: Box::new(self.ir_node.into_inner()),
1021                    metadata: self
1022                        .location
1023                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024                },
1025            )
1026        } else {
1027            Stream::new(
1028                self.location.clone(),
1029                HydroNode::ObserveNonDet {
1030                    inner: Box::new(self.ir_node.into_inner()),
1031                    trusted: false,
1032                    metadata: self
1033                        .location
1034                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035                },
1036            )
1037        }
1038    }
1039
1040    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1041    // is not observable
1042    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1043        if R::RETRIES_KIND == R2::RETRIES_KIND {
1044            Stream::new(self.location, self.ir_node.into_inner())
1045        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1046            // We can always weaken the retries guarantee
1047            Stream::new(
1048                self.location.clone(),
1049                HydroNode::Cast {
1050                    inner: Box::new(self.ir_node.into_inner()),
1051                    metadata: self
1052                        .location
1053                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1054                },
1055            )
1056        } else {
1057            Stream::new(
1058                self.location.clone(),
1059                HydroNode::ObserveNonDet {
1060                    inner: Box::new(self.ir_node.into_inner()),
1061                    trusted: true,
1062                    metadata: self
1063                        .location
1064                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1065                },
1066            )
1067        }
1068    }
1069
1070    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1071    /// which is always safe because that is the weakest possible guarantee.
1072    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1073        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1074        self.assume_retries::<AtLeastOnce>(nondet)
1075    }
1076
1077    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1078    /// enforcing that `R2` is weaker than the input retries guarantee.
1079    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1080        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1081        self.assume_retries::<R2>(nondet)
1082    }
1083}
1084
1085impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1086where
1087    L: Location<'a>,
1088{
1089    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1090    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1091    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1092        self.assume_retries(
1093            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1094        )
1095    }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100    L: Location<'a>,
1101{
1102    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111    /// # }, |mut stream| async move {
1112    /// // 1, 2, 3
1113    /// # for w in vec![1, 2, 3] {
1114    /// #     assert_eq!(stream.next().await.unwrap(), w);
1115    /// # }
1116    /// # }));
1117    /// # }
1118    /// ```
1119    pub fn cloned(self) -> Stream<T, L, B, O, R>
1120    where
1121        T: Clone,
1122    {
1123        self.map(q!(|d| d.clone()))
1124    }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129    L: Location<'a>,
1130{
1131    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134    ///
1135    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1136    /// and there may be duplicates.
1137    ///
1138    /// # Example
1139    /// ```rust
1140    /// # #[cfg(feature = "deploy")] {
1141    /// # use hydro_lang::prelude::*;
1142    /// # use futures::StreamExt;
1143    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144    /// let tick = process.tick();
1145    /// let bools = process.source_iter(q!(vec![false, true, false]));
1146    /// let batch = bools.batch(&tick, nondet!(/** test */));
1147    /// batch
1148    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1149    ///     .all_ticks()
1150    /// # }, |mut stream| async move {
1151    /// // true
1152    /// # assert_eq!(stream.next().await.unwrap(), true);
1153    /// # }));
1154    /// # }
1155    /// ```
1156    pub fn fold_commutative_idempotent<A, I, F>(
1157        self,
1158        init: impl IntoQuotedMut<'a, I, L>,
1159        comb: impl IntoQuotedMut<'a, F, L>,
1160    ) -> Singleton<A, L, B>
1161    where
1162        I: Fn() -> A + 'a,
1163        F: Fn(&mut A, T),
1164    {
1165        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1166        self.assume_ordering(nondet)
1167            .assume_retries(nondet)
1168            .fold(init, comb)
1169    }
1170
1171    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1172    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1173    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1174    /// reference, so that it can be modified in place.
1175    ///
1176    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1177    /// and there may be duplicates.
1178    ///
1179    /// # Example
1180    /// ```rust
1181    /// # #[cfg(feature = "deploy")] {
1182    /// # use hydro_lang::prelude::*;
1183    /// # use futures::StreamExt;
1184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185    /// let tick = process.tick();
1186    /// let bools = process.source_iter(q!(vec![false, true, false]));
1187    /// let batch = bools.batch(&tick, nondet!(/** test */));
1188    /// batch
1189    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1190    ///     .all_ticks()
1191    /// # }, |mut stream| async move {
1192    /// // true
1193    /// # assert_eq!(stream.next().await.unwrap(), true);
1194    /// # }));
1195    /// # }
1196    /// ```
1197    pub fn reduce_commutative_idempotent<F>(
1198        self,
1199        comb: impl IntoQuotedMut<'a, F, L>,
1200    ) -> Optional<T, L, B>
1201    where
1202        F: Fn(&mut T, T) + 'a,
1203    {
1204        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205        self.assume_retries(nondet).reduce_commutative(comb)
1206    }
1207
1208    // only for internal APIs that have been carefully vetted, will eventually be removed once we
1209    // have algebraic verification of these properties
1210    fn reduce_commutative_idempotent_trusted<F>(
1211        self,
1212        comb: impl IntoQuotedMut<'a, F, L>,
1213    ) -> Optional<T, L, B>
1214    where
1215        F: Fn(&mut T, T) + 'a,
1216    {
1217        self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1218            .reduce_commutative_trusted(comb)
1219    }
1220
1221    /// Computes the maximum element in the stream as an [`Optional`], which
1222    /// will be empty until the first element in the input arrives.
1223    ///
1224    /// # Example
1225    /// ```rust
1226    /// # #[cfg(feature = "deploy")] {
1227    /// # use hydro_lang::prelude::*;
1228    /// # use futures::StreamExt;
1229    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230    /// let tick = process.tick();
1231    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1232    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233    /// batch.max().all_ticks()
1234    /// # }, |mut stream| async move {
1235    /// // 4
1236    /// # assert_eq!(stream.next().await.unwrap(), 4);
1237    /// # }));
1238    /// # }
1239    /// ```
1240    pub fn max(self) -> Optional<T, L, B>
1241    where
1242        T: Ord,
1243    {
1244        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1245            if new > *curr {
1246                *curr = new;
1247            }
1248        }))
1249    }
1250
1251    /// Computes the minimum element in the stream as an [`Optional`], which
1252    /// will be empty until the first element in the input arrives.
1253    ///
1254    /// # Example
1255    /// ```rust
1256    /// # #[cfg(feature = "deploy")] {
1257    /// # use hydro_lang::prelude::*;
1258    /// # use futures::StreamExt;
1259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260    /// let tick = process.tick();
1261    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1262    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1263    /// batch.min().all_ticks()
1264    /// # }, |mut stream| async move {
1265    /// // 1
1266    /// # assert_eq!(stream.next().await.unwrap(), 1);
1267    /// # }));
1268    /// # }
1269    /// ```
1270    pub fn min(self) -> Optional<T, L, B>
1271    where
1272        T: Ord,
1273    {
1274        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1275            if new < *curr {
1276                *curr = new;
1277            }
1278        }))
1279    }
1280}
1281
1282impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1283where
1284    L: Location<'a>,
1285{
1286    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1287    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1288    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1289    ///
1290    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1291    ///
1292    /// # Example
1293    /// ```rust
1294    /// # #[cfg(feature = "deploy")] {
1295    /// # use hydro_lang::prelude::*;
1296    /// # use futures::StreamExt;
1297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298    /// let tick = process.tick();
1299    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1300    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301    /// batch
1302    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1303    ///     .all_ticks()
1304    /// # }, |mut stream| async move {
1305    /// // 10
1306    /// # assert_eq!(stream.next().await.unwrap(), 10);
1307    /// # }));
1308    /// # }
1309    /// ```
1310    pub fn fold_commutative<A, I, F>(
1311        self,
1312        init: impl IntoQuotedMut<'a, I, L>,
1313        comb: impl IntoQuotedMut<'a, F, L>,
1314    ) -> Singleton<A, L, B>
1315    where
1316        I: Fn() -> A + 'a,
1317        F: Fn(&mut A, T),
1318    {
1319        let nondet = nondet!(/** the combinator function is commutative */);
1320        self.assume_ordering(nondet).fold(init, comb)
1321    }
1322
1323    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1324    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1325    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1326    /// reference, so that it can be modified in place.
1327    ///
1328    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1329    ///
1330    /// # Example
1331    /// ```rust
1332    /// # #[cfg(feature = "deploy")] {
1333    /// # use hydro_lang::prelude::*;
1334    /// # use futures::StreamExt;
1335    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336    /// let tick = process.tick();
1337    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1338    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1339    /// batch
1340    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1341    ///     .all_ticks()
1342    /// # }, |mut stream| async move {
1343    /// // 10
1344    /// # assert_eq!(stream.next().await.unwrap(), 10);
1345    /// # }));
1346    /// # }
1347    /// ```
1348    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1349    where
1350        F: Fn(&mut T, T) + 'a,
1351    {
1352        let nondet = nondet!(/** the combinator function is commutative */);
1353        self.assume_ordering(nondet).reduce(comb)
1354    }
1355
1356    fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1357    where
1358        F: Fn(&mut T, T) + 'a,
1359    {
1360        let ordered = if B::BOUNDED {
1361            self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1362        } else {
1363            self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1364        };
1365
1366        ordered.reduce(comb)
1367    }
1368
1369    /// Computes the number of elements in the stream as a [`Singleton`].
1370    ///
1371    /// # Example
1372    /// ```rust
1373    /// # #[cfg(feature = "deploy")] {
1374    /// # use hydro_lang::prelude::*;
1375    /// # use futures::StreamExt;
1376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1377    /// let tick = process.tick();
1378    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1379    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1380    /// batch.count().all_ticks()
1381    /// # }, |mut stream| async move {
1382    /// // 4
1383    /// # assert_eq!(stream.next().await.unwrap(), 4);
1384    /// # }));
1385    /// # }
1386    /// ```
1387    pub fn count(self) -> Singleton<usize, L, B> {
1388        self.assume_ordering_trusted(nondet!(
1389            /// Order does not affect eventual count, and also does not affect intermediate states.
1390        ))
1391        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1392    }
1393}
1394
1395impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1396where
1397    L: Location<'a>,
1398{
1399    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1400    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1401    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1402    ///
1403    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1404    ///
1405    /// # Example
1406    /// ```rust
1407    /// # #[cfg(feature = "deploy")] {
1408    /// # use hydro_lang::prelude::*;
1409    /// # use futures::StreamExt;
1410    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1411    /// let tick = process.tick();
1412    /// let bools = process.source_iter(q!(vec![false, true, false]));
1413    /// let batch = bools.batch(&tick, nondet!(/** test */));
1414    /// batch
1415    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1416    ///     .all_ticks()
1417    /// # }, |mut stream| async move {
1418    /// // true
1419    /// # assert_eq!(stream.next().await.unwrap(), true);
1420    /// # }));
1421    /// # }
1422    /// ```
1423    pub fn fold_idempotent<A, I, F>(
1424        self,
1425        init: impl IntoQuotedMut<'a, I, L>,
1426        comb: impl IntoQuotedMut<'a, F, L>,
1427    ) -> Singleton<A, L, B>
1428    where
1429        I: Fn() -> A + 'a,
1430        F: Fn(&mut A, T),
1431    {
1432        let nondet = nondet!(/** the combinator function is idempotent */);
1433        self.assume_retries(nondet).fold(init, comb)
1434    }
1435
1436    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1437    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1438    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1439    /// reference, so that it can be modified in place.
1440    ///
1441    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1442    ///
1443    /// # Example
1444    /// ```rust
1445    /// # #[cfg(feature = "deploy")] {
1446    /// # use hydro_lang::prelude::*;
1447    /// # use futures::StreamExt;
1448    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1449    /// let tick = process.tick();
1450    /// let bools = process.source_iter(q!(vec![false, true, false]));
1451    /// let batch = bools.batch(&tick, nondet!(/** test */));
1452    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1453    /// # }, |mut stream| async move {
1454    /// // true
1455    /// # assert_eq!(stream.next().await.unwrap(), true);
1456    /// # }));
1457    /// # }
1458    /// ```
1459    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1460    where
1461        F: Fn(&mut T, T) + 'a,
1462    {
1463        let nondet = nondet!(/** the combinator function is idempotent */);
1464        self.assume_retries(nondet).reduce(comb)
1465    }
1466
1467    /// Computes the first element in the stream as an [`Optional`], which
1468    /// will be empty until the first element in the input arrives.
1469    ///
1470    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1471    /// re-ordering of elements may cause the first element to change.
1472    ///
1473    /// # Example
1474    /// ```rust
1475    /// # #[cfg(feature = "deploy")] {
1476    /// # use hydro_lang::prelude::*;
1477    /// # use futures::StreamExt;
1478    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1479    /// let tick = process.tick();
1480    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1481    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482    /// batch.first().all_ticks()
1483    /// # }, |mut stream| async move {
1484    /// // 1
1485    /// # assert_eq!(stream.next().await.unwrap(), 1);
1486    /// # }));
1487    /// # }
1488    /// ```
1489    pub fn first(self) -> Optional<T, L, B> {
1490        self.reduce_idempotent(q!(|_, _| {}))
1491    }
1492
1493    /// Computes the last element in the stream as an [`Optional`], which
1494    /// will be empty until an element in the input arrives.
1495    ///
1496    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1497    /// re-ordering of elements may cause the last element to change.
1498    ///
1499    /// # Example
1500    /// ```rust
1501    /// # #[cfg(feature = "deploy")] {
1502    /// # use hydro_lang::prelude::*;
1503    /// # use futures::StreamExt;
1504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1505    /// let tick = process.tick();
1506    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1507    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1508    /// batch.last().all_ticks()
1509    /// # }, |mut stream| async move {
1510    /// // 4
1511    /// # assert_eq!(stream.next().await.unwrap(), 4);
1512    /// # }));
1513    /// # }
1514    /// ```
1515    pub fn last(self) -> Optional<T, L, B> {
1516        self.reduce_idempotent(q!(|curr, new| *curr = new))
1517    }
1518}
1519
1520impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1521where
1522    L: Location<'a>,
1523{
1524    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1525    ///
1526    /// # Example
1527    /// ```rust
1528    /// # #[cfg(feature = "deploy")] {
1529    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1530    /// # use futures::StreamExt;
1531    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1532    /// let tick = process.tick();
1533    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1534    /// numbers.enumerate()
1535    /// # }, |mut stream| async move {
1536    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1537    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1538    /// #     assert_eq!(stream.next().await.unwrap(), w);
1539    /// # }
1540    /// # }));
1541    /// # }
1542    /// ```
1543    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1544        Stream::new(
1545            self.location.clone(),
1546            HydroNode::Enumerate {
1547                input: Box::new(self.ir_node.into_inner()),
1548                metadata: self.location.new_node_metadata(Stream::<
1549                    (usize, T),
1550                    L,
1551                    B,
1552                    TotalOrder,
1553                    ExactlyOnce,
1554                >::collection_kind()),
1555            },
1556        )
1557    }
1558
1559    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1560    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1561    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1562    ///
1563    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1564    /// to depend on the order of elements in the stream.
1565    ///
1566    /// # Example
1567    /// ```rust
1568    /// # #[cfg(feature = "deploy")] {
1569    /// # use hydro_lang::prelude::*;
1570    /// # use futures::StreamExt;
1571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1572    /// let tick = process.tick();
1573    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1574    /// let batch = words.batch(&tick, nondet!(/** test */));
1575    /// batch
1576    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1577    ///     .all_ticks()
1578    /// # }, |mut stream| async move {
1579    /// // "HELLOWORLD"
1580    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1581    /// # }));
1582    /// # }
1583    /// ```
1584    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1585        self,
1586        init: impl IntoQuotedMut<'a, I, L>,
1587        comb: impl IntoQuotedMut<'a, F, L>,
1588    ) -> Singleton<A, L, B> {
1589        let init = init.splice_fn0_ctx(&self.location).into();
1590        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1591
1592        let core = HydroNode::Fold {
1593            init,
1594            acc: comb,
1595            input: Box::new(self.ir_node.into_inner()),
1596            metadata: self
1597                .location
1598                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1599        };
1600
1601        Singleton::new(self.location, core)
1602    }
1603
1604    /// Collects all the elements of this stream into a single [`Vec`] element.
1605    ///
1606    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1607    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1608    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1609    /// the vector at an arbitrary point in time.
1610    ///
1611    /// # Example
1612    /// ```rust
1613    /// # #[cfg(feature = "deploy")] {
1614    /// # use hydro_lang::prelude::*;
1615    /// # use futures::StreamExt;
1616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617    /// let tick = process.tick();
1618    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1619    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1620    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1621    /// # }, |mut stream| async move {
1622    /// // [ vec![1, 2, 3, 4] ]
1623    /// # for w in vec![vec![1, 2, 3, 4]] {
1624    /// #     assert_eq!(stream.next().await.unwrap(), w);
1625    /// # }
1626    /// # }));
1627    /// # }
1628    /// ```
1629    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1630        self.fold(
1631            q!(|| vec![]),
1632            q!(|acc, v| {
1633                acc.push(v);
1634            }),
1635        )
1636    }
1637
1638    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1639    /// and emitting each intermediate result.
1640    ///
1641    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1642    /// containing all intermediate accumulated values. The scan operation can also terminate early
1643    /// by returning `None`.
1644    ///
1645    /// The function takes a mutable reference to the accumulator and the current element, and returns
1646    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1647    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1648    ///
1649    /// # Examples
1650    ///
1651    /// Basic usage - running sum:
1652    /// ```rust
1653    /// # #[cfg(feature = "deploy")] {
1654    /// # use hydro_lang::prelude::*;
1655    /// # use futures::StreamExt;
1656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1658    ///     q!(|| 0),
1659    ///     q!(|acc, x| {
1660    ///         *acc += x;
1661    ///         Some(*acc)
1662    ///     }),
1663    /// )
1664    /// # }, |mut stream| async move {
1665    /// // Output: 1, 3, 6, 10
1666    /// # for w in vec![1, 3, 6, 10] {
1667    /// #     assert_eq!(stream.next().await.unwrap(), w);
1668    /// # }
1669    /// # }));
1670    /// # }
1671    /// ```
1672    ///
1673    /// Early termination example:
1674    /// ```rust
1675    /// # #[cfg(feature = "deploy")] {
1676    /// # use hydro_lang::prelude::*;
1677    /// # use futures::StreamExt;
1678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1680    ///     q!(|| 1),
1681    ///     q!(|state, x| {
1682    ///         *state = *state * x;
1683    ///         if *state > 6 {
1684    ///             None // Terminate the stream
1685    ///         } else {
1686    ///             Some(-*state)
1687    ///         }
1688    ///     }),
1689    /// )
1690    /// # }, |mut stream| async move {
1691    /// // Output: -1, -2, -6
1692    /// # for w in vec![-1, -2, -6] {
1693    /// #     assert_eq!(stream.next().await.unwrap(), w);
1694    /// # }
1695    /// # }));
1696    /// # }
1697    /// ```
1698    pub fn scan<A, U, I, F>(
1699        self,
1700        init: impl IntoQuotedMut<'a, I, L>,
1701        f: impl IntoQuotedMut<'a, F, L>,
1702    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1703    where
1704        I: Fn() -> A + 'a,
1705        F: Fn(&mut A, T) -> Option<U> + 'a,
1706    {
1707        let init = init.splice_fn0_ctx(&self.location).into();
1708        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1709
1710        Stream::new(
1711            self.location.clone(),
1712            HydroNode::Scan {
1713                init,
1714                acc: f,
1715                input: Box::new(self.ir_node.into_inner()),
1716                metadata: self.location.new_node_metadata(
1717                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1718                ),
1719            },
1720        )
1721    }
1722
1723    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1724    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1725    /// until the first element in the input arrives.
1726    ///
1727    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1728    /// to depend on the order of elements in the stream.
1729    ///
1730    /// # Example
1731    /// ```rust
1732    /// # #[cfg(feature = "deploy")] {
1733    /// # use hydro_lang::prelude::*;
1734    /// # use futures::StreamExt;
1735    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736    /// let tick = process.tick();
1737    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1738    /// let batch = words.batch(&tick, nondet!(/** test */));
1739    /// batch
1740    ///     .map(q!(|x| x.to_string()))
1741    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1742    ///     .all_ticks()
1743    /// # }, |mut stream| async move {
1744    /// // "HELLOWORLD"
1745    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1746    /// # }));
1747    /// # }
1748    /// ```
1749    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1750        self,
1751        comb: impl IntoQuotedMut<'a, F, L>,
1752    ) -> Optional<T, L, B> {
1753        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1754        let core = HydroNode::Reduce {
1755            f,
1756            input: Box::new(self.ir_node.into_inner()),
1757            metadata: self
1758                .location
1759                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1760        };
1761
1762        Optional::new(self.location, core)
1763    }
1764}
1765
1766impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1767    /// Produces a new stream that interleaves the elements of the two input streams.
1768    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1769    ///
1770    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1771    /// [`Bounded`], you can use [`Stream::chain`] instead.
1772    ///
1773    /// # Example
1774    /// ```rust
1775    /// # #[cfg(feature = "deploy")] {
1776    /// # use hydro_lang::prelude::*;
1777    /// # use futures::StreamExt;
1778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1779    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1780    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1781    /// # }, |mut stream| async move {
1782    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1783    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1784    /// #     assert_eq!(stream.next().await.unwrap(), w);
1785    /// # }
1786    /// # }));
1787    /// # }
1788    /// ```
1789    pub fn interleave<O2: Ordering, R2: Retries>(
1790        self,
1791        other: Stream<T, L, Unbounded, O2, R2>,
1792    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1793    where
1794        R: MinRetries<R2>,
1795    {
1796        Stream::new(
1797            self.location.clone(),
1798            HydroNode::Chain {
1799                first: Box::new(self.ir_node.into_inner()),
1800                second: Box::new(other.ir_node.into_inner()),
1801                metadata: self.location.new_node_metadata(Stream::<
1802                    T,
1803                    L,
1804                    Unbounded,
1805                    NoOrder,
1806                    <R as MinRetries<R2>>::Min,
1807                >::collection_kind()),
1808            },
1809        )
1810    }
1811}
1812
1813impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1814where
1815    L: Location<'a>,
1816{
1817    /// Produces a new stream that emits the input elements in sorted order.
1818    ///
1819    /// The input stream can have any ordering guarantee, but the output stream
1820    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1821    /// elements in the input stream are available, so it requires the input stream
1822    /// to be [`Bounded`].
1823    ///
1824    /// # Example
1825    /// ```rust
1826    /// # #[cfg(feature = "deploy")] {
1827    /// # use hydro_lang::prelude::*;
1828    /// # use futures::StreamExt;
1829    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1830    /// let tick = process.tick();
1831    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1832    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1833    /// batch.sort().all_ticks()
1834    /// # }, |mut stream| async move {
1835    /// // 1, 2, 3, 4
1836    /// # for w in (1..5) {
1837    /// #     assert_eq!(stream.next().await.unwrap(), w);
1838    /// # }
1839    /// # }));
1840    /// # }
1841    /// ```
1842    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1843    where
1844        T: Ord,
1845    {
1846        Stream::new(
1847            self.location.clone(),
1848            HydroNode::Sort {
1849                input: Box::new(self.ir_node.into_inner()),
1850                metadata: self
1851                    .location
1852                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1853            },
1854        )
1855    }
1856
1857    /// Produces a new stream that first emits the elements of the `self` stream,
1858    /// and then emits the elements of the `other` stream. The output stream has
1859    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1860    /// [`TotalOrder`] guarantee.
1861    ///
1862    /// Currently, both input streams must be [`Bounded`]. This operator will block
1863    /// on the first stream until all its elements are available. In a future version,
1864    /// we will relax the requirement on the `other` stream.
1865    ///
1866    /// # Example
1867    /// ```rust
1868    /// # #[cfg(feature = "deploy")] {
1869    /// # use hydro_lang::prelude::*;
1870    /// # use futures::StreamExt;
1871    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1872    /// let tick = process.tick();
1873    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1874    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1875    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1876    /// # }, |mut stream| async move {
1877    /// // 2, 3, 4, 5, 1, 2, 3, 4
1878    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1879    /// #     assert_eq!(stream.next().await.unwrap(), w);
1880    /// # }
1881    /// # }));
1882    /// # }
1883    /// ```
1884    pub fn chain<O2: Ordering, R2: Retries>(
1885        self,
1886        other: Stream<T, L, Bounded, O2, R2>,
1887    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1888    where
1889        O: MinOrder<O2>,
1890        R: MinRetries<R2>,
1891    {
1892        check_matching_location(&self.location, &other.location);
1893
1894        Stream::new(
1895            self.location.clone(),
1896            HydroNode::Chain {
1897                first: Box::new(self.ir_node.into_inner()),
1898                second: Box::new(other.ir_node.into_inner()),
1899                metadata: self.location.new_node_metadata(Stream::<
1900                    T,
1901                    L,
1902                    Bounded,
1903                    <O as MinOrder<O2>>::Min,
1904                    <R as MinRetries<R2>>::Min,
1905                >::collection_kind()),
1906            },
1907        )
1908    }
1909
1910    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1911    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1912    /// because this is compiled into a nested loop.
1913    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1914        self,
1915        other: Stream<T2, L, Bounded, O2, R>,
1916    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1917    where
1918        T: Clone,
1919        T2: Clone,
1920    {
1921        check_matching_location(&self.location, &other.location);
1922
1923        Stream::new(
1924            self.location.clone(),
1925            HydroNode::CrossProduct {
1926                left: Box::new(self.ir_node.into_inner()),
1927                right: Box::new(other.ir_node.into_inner()),
1928                metadata: self.location.new_node_metadata(Stream::<
1929                    (T, T2),
1930                    L,
1931                    Bounded,
1932                    <O2 as MinOrder<O>>::Min,
1933                    R,
1934                >::collection_kind()),
1935            },
1936        )
1937    }
1938
1939    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1940    /// `self` used as the values for *each* key.
1941    ///
1942    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1943    /// values. For example, it can be used to send the same set of elements to several cluster
1944    /// members, if the membership information is available as a [`KeyedSingleton`].
1945    ///
1946    /// # Example
1947    /// ```rust
1948    /// # #[cfg(feature = "deploy")] {
1949    /// # use hydro_lang::prelude::*;
1950    /// # use futures::StreamExt;
1951    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1952    /// # let tick = process.tick();
1953    /// let keyed_singleton = // { 1: (), 2: () }
1954    /// # process
1955    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1956    /// #     .into_keyed()
1957    /// #     .batch(&tick, nondet!(/** test */))
1958    /// #     .first();
1959    /// let stream = // [ "a", "b" ]
1960    /// # process
1961    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1962    /// #     .batch(&tick, nondet!(/** test */));
1963    /// stream.repeat_with_keys(keyed_singleton)
1964    /// # .entries().all_ticks()
1965    /// # }, |mut stream| async move {
1966    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1967    /// # let mut results = Vec::new();
1968    /// # for _ in 0..4 {
1969    /// #     results.push(stream.next().await.unwrap());
1970    /// # }
1971    /// # results.sort();
1972    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1973    /// # }));
1974    /// # }
1975    /// ```
1976    pub fn repeat_with_keys<K, V2>(
1977        self,
1978        keys: KeyedSingleton<K, V2, L, Bounded>,
1979    ) -> KeyedStream<K, T, L, Bounded, O, R>
1980    where
1981        K: Clone,
1982        T: Clone,
1983    {
1984        keys.keys()
1985            .weaken_retries()
1986            .assume_ordering_trusted::<TotalOrder>(
1987                nondet!(/** keyed stream does not depend on ordering of keys */),
1988            )
1989            .cross_product_nested_loop(self)
1990            .into_keyed()
1991    }
1992}
1993
1994impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1995where
1996    L: Location<'a>,
1997{
1998    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1999    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2000    /// by equi-joining the two streams on the key attribute `K`.
2001    ///
2002    /// # Example
2003    /// ```rust
2004    /// # #[cfg(feature = "deploy")] {
2005    /// # use hydro_lang::prelude::*;
2006    /// # use std::collections::HashSet;
2007    /// # use futures::StreamExt;
2008    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2009    /// let tick = process.tick();
2010    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2011    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2012    /// stream1.join(stream2)
2013    /// # }, |mut stream| async move {
2014    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2015    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2016    /// # stream.map(|i| assert!(expected.contains(&i)));
2017    /// # }));
2018    /// # }
2019    pub fn join<V2, O2: Ordering, R2: Retries>(
2020        self,
2021        n: Stream<(K, V2), L, B, O2, R2>,
2022    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2023    where
2024        K: Eq + Hash,
2025        R: MinRetries<R2>,
2026    {
2027        check_matching_location(&self.location, &n.location);
2028
2029        Stream::new(
2030            self.location.clone(),
2031            HydroNode::Join {
2032                left: Box::new(self.ir_node.into_inner()),
2033                right: Box::new(n.ir_node.into_inner()),
2034                metadata: self.location.new_node_metadata(Stream::<
2035                    (K, (V1, V2)),
2036                    L,
2037                    B,
2038                    NoOrder,
2039                    <R as MinRetries<R2>>::Min,
2040                >::collection_kind()),
2041            },
2042        )
2043    }
2044
2045    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2046    /// computes the anti-join of the items in the input -- i.e. returns
2047    /// unique items in the first input that do not have a matching key
2048    /// in the second input.
2049    ///
2050    /// # Example
2051    /// ```rust
2052    /// # #[cfg(feature = "deploy")] {
2053    /// # use hydro_lang::prelude::*;
2054    /// # use futures::StreamExt;
2055    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2056    /// let tick = process.tick();
2057    /// let stream = process
2058    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2059    ///   .batch(&tick, nondet!(/** test */));
2060    /// let batch = process
2061    ///   .source_iter(q!(vec![1, 2]))
2062    ///   .batch(&tick, nondet!(/** test */));
2063    /// stream.anti_join(batch).all_ticks()
2064    /// # }, |mut stream| async move {
2065    /// # for w in vec![(3, 'c'), (4, 'd')] {
2066    /// #     assert_eq!(stream.next().await.unwrap(), w);
2067    /// # }
2068    /// # }));
2069    /// # }
2070    pub fn anti_join<O2: Ordering, R2: Retries>(
2071        self,
2072        n: Stream<K, L, Bounded, O2, R2>,
2073    ) -> Stream<(K, V1), L, B, O, R>
2074    where
2075        K: Eq + Hash,
2076    {
2077        check_matching_location(&self.location, &n.location);
2078
2079        Stream::new(
2080            self.location.clone(),
2081            HydroNode::AntiJoin {
2082                pos: Box::new(self.ir_node.into_inner()),
2083                neg: Box::new(n.ir_node.into_inner()),
2084                metadata: self
2085                    .location
2086                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2087            },
2088        )
2089    }
2090}
2091
2092impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2093    Stream<(K, V), L, B, O, R>
2094{
2095    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2096    /// is used as the key and the second element is added to the entries associated with that key.
2097    ///
2098    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2099    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2100    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2101    /// total ordering _within_ each group but no ordering _across_ groups.
2102    ///
2103    /// # Example
2104    /// ```rust
2105    /// # #[cfg(feature = "deploy")] {
2106    /// # use hydro_lang::prelude::*;
2107    /// # use futures::StreamExt;
2108    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2109    /// process
2110    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2111    ///     .into_keyed()
2112    /// #   .entries()
2113    /// # }, |mut stream| async move {
2114    /// // { 1: [2, 3], 2: [4] }
2115    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2116    /// #     assert_eq!(stream.next().await.unwrap(), w);
2117    /// # }
2118    /// # }));
2119    /// # }
2120    /// ```
2121    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2122        KeyedStream::new(
2123            self.location.clone(),
2124            HydroNode::Cast {
2125                inner: Box::new(self.ir_node.into_inner()),
2126                metadata: self
2127                    .location
2128                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2129            },
2130        )
2131    }
2132}
2133
2134impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2135where
2136    K: Eq + Hash,
2137    L: Location<'a>,
2138{
2139    #[deprecated = "use .into_keyed().fold(...) instead"]
2140    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2141    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2142    /// in the second element are accumulated via the `comb` closure.
2143    ///
2144    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2145    /// to depend on the order of elements in the stream.
2146    ///
2147    /// If the input and output value types are the same and do not require initialization then use
2148    /// [`Stream::reduce_keyed`].
2149    ///
2150    /// # Example
2151    /// ```rust
2152    /// # #[cfg(feature = "deploy")] {
2153    /// # use hydro_lang::prelude::*;
2154    /// # use futures::StreamExt;
2155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2156    /// let tick = process.tick();
2157    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2158    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2159    /// batch
2160    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2161    ///     .all_ticks()
2162    /// # }, |mut stream| async move {
2163    /// // (1, 5), (2, 7)
2164    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2165    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2166    /// # }));
2167    /// # }
2168    /// ```
2169    pub fn fold_keyed<A, I, F>(
2170        self,
2171        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2172        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2173    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2174    where
2175        I: Fn() -> A + 'a,
2176        F: Fn(&mut A, V) + 'a,
2177    {
2178        self.into_keyed().fold(init, comb).entries()
2179    }
2180
2181    #[deprecated = "use .into_keyed().reduce(...) instead"]
2182    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2183    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2184    /// in the second element are accumulated via the `comb` closure.
2185    ///
2186    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2187    /// to depend on the order of elements in the stream.
2188    ///
2189    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2190    ///
2191    /// # Example
2192    /// ```rust
2193    /// # #[cfg(feature = "deploy")] {
2194    /// # use hydro_lang::prelude::*;
2195    /// # use futures::StreamExt;
2196    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2197    /// let tick = process.tick();
2198    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2199    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2200    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2201    /// # }, |mut stream| async move {
2202    /// // (1, 5), (2, 7)
2203    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2204    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2205    /// # }));
2206    /// # }
2207    /// ```
2208    pub fn reduce_keyed<F>(
2209        self,
2210        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2211    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2212    where
2213        F: Fn(&mut V, V) + 'a,
2214    {
2215        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2216
2217        Stream::new(
2218            self.location.clone(),
2219            HydroNode::ReduceKeyed {
2220                f,
2221                input: Box::new(self.ir_node.into_inner()),
2222                metadata: self.location.new_node_metadata(Stream::<
2223                    (K, V),
2224                    Tick<L>,
2225                    Bounded,
2226                    NoOrder,
2227                    ExactlyOnce,
2228                >::collection_kind()),
2229            },
2230        )
2231    }
2232}
2233
2234impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2235where
2236    K: Eq + Hash,
2237    L: Location<'a>,
2238{
2239    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2240    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2241    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2242    /// in the second element are accumulated via the `comb` closure.
2243    ///
2244    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2245    /// as there may be non-deterministic duplicates.
2246    ///
2247    /// If the input and output value types are the same and do not require initialization then use
2248    /// [`Stream::reduce_keyed_commutative_idempotent`].
2249    ///
2250    /// # Example
2251    /// ```rust
2252    /// # #[cfg(feature = "deploy")] {
2253    /// # use hydro_lang::prelude::*;
2254    /// # use futures::StreamExt;
2255    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2256    /// let tick = process.tick();
2257    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2258    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2259    /// batch
2260    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2261    ///     .all_ticks()
2262    /// # }, |mut stream| async move {
2263    /// // (1, false), (2, true)
2264    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2265    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2266    /// # }));
2267    /// # }
2268    /// ```
2269    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2270        self,
2271        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2272        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2273    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2274    where
2275        I: Fn() -> A + 'a,
2276        F: Fn(&mut A, V) + 'a,
2277    {
2278        self.into_keyed()
2279            .fold_commutative_idempotent(init, comb)
2280            .entries()
2281    }
2282
2283    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2284    /// # Example
2285    /// ```rust
2286    /// # #[cfg(feature = "deploy")] {
2287    /// # use hydro_lang::prelude::*;
2288    /// # use futures::StreamExt;
2289    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2290    /// let tick = process.tick();
2291    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2292    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2293    /// batch.keys().all_ticks()
2294    /// # }, |mut stream| async move {
2295    /// // 1, 2
2296    /// # assert_eq!(stream.next().await.unwrap(), 1);
2297    /// # assert_eq!(stream.next().await.unwrap(), 2);
2298    /// # }));
2299    /// # }
2300    /// ```
2301    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2302        self.into_keyed()
2303            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2304            .keys()
2305    }
2306
2307    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2308    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2309    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2310    /// in the second element are accumulated via the `comb` closure.
2311    ///
2312    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2313    /// as there may be non-deterministic duplicates.
2314    ///
2315    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2316    ///
2317    /// # Example
2318    /// ```rust
2319    /// # #[cfg(feature = "deploy")] {
2320    /// # use hydro_lang::prelude::*;
2321    /// # use futures::StreamExt;
2322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2323    /// let tick = process.tick();
2324    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2325    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2326    /// batch
2327    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2328    ///     .all_ticks()
2329    /// # }, |mut stream| async move {
2330    /// // (1, false), (2, true)
2331    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2332    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2333    /// # }));
2334    /// # }
2335    /// ```
2336    pub fn reduce_keyed_commutative_idempotent<F>(
2337        self,
2338        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2339    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2340    where
2341        F: Fn(&mut V, V) + 'a,
2342    {
2343        self.into_keyed()
2344            .reduce_commutative_idempotent(comb)
2345            .entries()
2346    }
2347}
2348
2349impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2350where
2351    K: Eq + Hash,
2352    L: Location<'a>,
2353{
2354    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2355    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2356    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2357    /// in the second element are accumulated via the `comb` closure.
2358    ///
2359    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2360    ///
2361    /// If the input and output value types are the same and do not require initialization then use
2362    /// [`Stream::reduce_keyed_commutative`].
2363    ///
2364    /// # Example
2365    /// ```rust
2366    /// # #[cfg(feature = "deploy")] {
2367    /// # use hydro_lang::prelude::*;
2368    /// # use futures::StreamExt;
2369    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2370    /// let tick = process.tick();
2371    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2372    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2373    /// batch
2374    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2375    ///     .all_ticks()
2376    /// # }, |mut stream| async move {
2377    /// // (1, 5), (2, 7)
2378    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2379    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2380    /// # }));
2381    /// # }
2382    /// ```
2383    pub fn fold_keyed_commutative<A, I, F>(
2384        self,
2385        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2386        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2387    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2388    where
2389        I: Fn() -> A + 'a,
2390        F: Fn(&mut A, V) + 'a,
2391    {
2392        self.into_keyed().fold_commutative(init, comb).entries()
2393    }
2394
2395    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2396    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2397    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2398    /// in the second element are accumulated via the `comb` closure.
2399    ///
2400    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2401    ///
2402    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2403    ///
2404    /// # Example
2405    /// ```rust
2406    /// # #[cfg(feature = "deploy")] {
2407    /// # use hydro_lang::prelude::*;
2408    /// # use futures::StreamExt;
2409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2410    /// let tick = process.tick();
2411    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2412    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2413    /// batch
2414    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2415    ///     .all_ticks()
2416    /// # }, |mut stream| async move {
2417    /// // (1, 5), (2, 7)
2418    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2419    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2420    /// # }));
2421    /// # }
2422    /// ```
2423    pub fn reduce_keyed_commutative<F>(
2424        self,
2425        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2426    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2427    where
2428        F: Fn(&mut V, V) + 'a,
2429    {
2430        self.into_keyed().reduce_commutative(comb).entries()
2431    }
2432}
2433
2434impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2435where
2436    K: Eq + Hash,
2437    L: Location<'a>,
2438{
2439    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2440    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2441    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2442    /// in the second element are accumulated via the `comb` closure.
2443    ///
2444    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2445    ///
2446    /// If the input and output value types are the same and do not require initialization then use
2447    /// [`Stream::reduce_keyed_idempotent`].
2448    ///
2449    /// # Example
2450    /// ```rust
2451    /// # #[cfg(feature = "deploy")] {
2452    /// # use hydro_lang::prelude::*;
2453    /// # use futures::StreamExt;
2454    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2455    /// let tick = process.tick();
2456    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2457    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2458    /// batch
2459    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2460    ///     .all_ticks()
2461    /// # }, |mut stream| async move {
2462    /// // (1, false), (2, true)
2463    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2464    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2465    /// # }));
2466    /// # }
2467    /// ```
2468    pub fn fold_keyed_idempotent<A, I, F>(
2469        self,
2470        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2471        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2472    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2473    where
2474        I: Fn() -> A + 'a,
2475        F: Fn(&mut A, V) + 'a,
2476    {
2477        self.into_keyed().fold_idempotent(init, comb).entries()
2478    }
2479
2480    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2481    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2482    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2483    /// in the second element are accumulated via the `comb` closure.
2484    ///
2485    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2486    ///
2487    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2488    ///
2489    /// # Example
2490    /// ```rust
2491    /// # #[cfg(feature = "deploy")] {
2492    /// # use hydro_lang::prelude::*;
2493    /// # use futures::StreamExt;
2494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2495    /// let tick = process.tick();
2496    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2497    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2498    /// batch
2499    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2500    ///     .all_ticks()
2501    /// # }, |mut stream| async move {
2502    /// // (1, false), (2, true)
2503    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2504    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2505    /// # }));
2506    /// # }
2507    /// ```
2508    pub fn reduce_keyed_idempotent<F>(
2509        self,
2510        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2511    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2512    where
2513        F: Fn(&mut V, V) + 'a,
2514    {
2515        self.into_keyed().reduce_idempotent(comb).entries()
2516    }
2517}
2518
2519impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2520where
2521    L: Location<'a> + NoTick,
2522{
2523    /// Returns a stream corresponding to the latest batch of elements being atomically
2524    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2525    /// the order of the input.
2526    ///
2527    /// # Non-Determinism
2528    /// The batch boundaries are non-deterministic and may change across executions.
2529    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2530        Stream::new(
2531            self.location.clone().tick,
2532            HydroNode::Batch {
2533                inner: Box::new(self.ir_node.into_inner()),
2534                metadata: self
2535                    .location
2536                    .tick
2537                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2538            },
2539        )
2540    }
2541
2542    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2543    /// See [`Stream::atomic`] for more details.
2544    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2545        Stream::new(
2546            self.location.tick.l.clone(),
2547            HydroNode::EndAtomic {
2548                inner: Box::new(self.ir_node.into_inner()),
2549                metadata: self
2550                    .location
2551                    .tick
2552                    .l
2553                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2554            },
2555        )
2556    }
2557}
2558
2559impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2560where
2561    L: Location<'a>,
2562{
2563    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2564    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2565    ///
2566    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2567    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2568    /// argument that declares where the stream will be atomically processed. Batching a stream into
2569    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2570    /// [`Tick`] will introduce asynchrony.
2571    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2572        let out_location = Atomic { tick: tick.clone() };
2573        Stream::new(
2574            out_location.clone(),
2575            HydroNode::BeginAtomic {
2576                inner: Box::new(self.ir_node.into_inner()),
2577                metadata: out_location
2578                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2579            },
2580        )
2581    }
2582
2583    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2584    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2585    /// the order of the input. The output stream will execute in the [`Tick`] that was
2586    /// used to create the atomic section.
2587    ///
2588    /// # Non-Determinism
2589    /// The batch boundaries are non-deterministic and may change across executions.
2590    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2591        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2592        Stream::new(
2593            tick.clone(),
2594            HydroNode::Batch {
2595                inner: Box::new(self.ir_node.into_inner()),
2596                metadata: tick
2597                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2598            },
2599        )
2600    }
2601
2602    /// Given a time interval, returns a stream corresponding to samples taken from the
2603    /// stream roughly at that interval. The output will have elements in the same order
2604    /// as the input, but with arbitrary elements skipped between samples. There is also
2605    /// no guarantee on the exact timing of the samples.
2606    ///
2607    /// # Non-Determinism
2608    /// The output stream is non-deterministic in which elements are sampled, since this
2609    /// is controlled by a clock.
2610    pub fn sample_every(
2611        self,
2612        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2613        nondet: NonDet,
2614    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2615    where
2616        L: NoTick + NoAtomic,
2617    {
2618        let samples = self.location.source_interval(interval, nondet);
2619
2620        let tick = self.location.tick();
2621        self.batch(&tick, nondet)
2622            .filter_if_some(samples.batch(&tick, nondet).first())
2623            .all_ticks()
2624            .weakest_retries()
2625    }
2626
2627    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2628    /// stream has not emitted a value since that duration.
2629    ///
2630    /// # Non-Determinism
2631    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2632    /// samples take place, timeouts may be non-deterministically generated or missed,
2633    /// and the notification of the timeout may be delayed as well. There is also no
2634    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2635    /// detected based on when the next sample is taken.
2636    pub fn timeout(
2637        self,
2638        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2639        nondet: NonDet,
2640    ) -> Optional<(), L, Unbounded>
2641    where
2642        L: NoTick + NoAtomic,
2643    {
2644        let tick = self.location.tick();
2645
2646        let latest_received = self.assume_retries(nondet).fold_commutative(
2647            q!(|| None),
2648            q!(|latest, _| {
2649                *latest = Some(Instant::now());
2650            }),
2651        );
2652
2653        latest_received
2654            .snapshot(&tick, nondet)
2655            .filter_map(q!(move |latest_received| {
2656                if let Some(latest_received) = latest_received {
2657                    if Instant::now().duration_since(latest_received) > duration {
2658                        Some(())
2659                    } else {
2660                        None
2661                    }
2662                } else {
2663                    Some(())
2664                }
2665            }))
2666            .latest()
2667    }
2668}
2669
2670impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2671where
2672    L: Location<'a> + NoTick + NoAtomic,
2673    F: Future<Output = T>,
2674{
2675    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2676    /// Future outputs are produced as available, regardless of input arrival order.
2677    ///
2678    /// # Example
2679    /// ```rust
2680    /// # #[cfg(feature = "deploy")] {
2681    /// # use std::collections::HashSet;
2682    /// # use futures::StreamExt;
2683    /// # use hydro_lang::prelude::*;
2684    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2685    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2686    ///     .map(q!(|x| async move {
2687    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2688    ///         x
2689    ///     }))
2690    ///     .resolve_futures()
2691    /// #   },
2692    /// #   |mut stream| async move {
2693    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2694    /// #       let mut output = HashSet::new();
2695    /// #       for _ in 1..10 {
2696    /// #           output.insert(stream.next().await.unwrap());
2697    /// #       }
2698    /// #       assert_eq!(
2699    /// #           output,
2700    /// #           HashSet::<i32>::from_iter(1..10)
2701    /// #       );
2702    /// #   },
2703    /// # ));
2704    /// # }
2705    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2706        Stream::new(
2707            self.location.clone(),
2708            HydroNode::ResolveFutures {
2709                input: Box::new(self.ir_node.into_inner()),
2710                metadata: self
2711                    .location
2712                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2713            },
2714        )
2715    }
2716
2717    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2718    /// Future outputs are produced in the same order as the input stream.
2719    ///
2720    /// # Example
2721    /// ```rust
2722    /// # #[cfg(feature = "deploy")] {
2723    /// # use std::collections::HashSet;
2724    /// # use futures::StreamExt;
2725    /// # use hydro_lang::prelude::*;
2726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2727    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2728    ///     .map(q!(|x| async move {
2729    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2730    ///         x
2731    ///     }))
2732    ///     .resolve_futures_ordered()
2733    /// #   },
2734    /// #   |mut stream| async move {
2735    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2736    /// #       let mut output = Vec::new();
2737    /// #       for _ in 1..10 {
2738    /// #           output.push(stream.next().await.unwrap());
2739    /// #       }
2740    /// #       assert_eq!(
2741    /// #           output,
2742    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2743    /// #       );
2744    /// #   },
2745    /// # ));
2746    /// # }
2747    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2748        Stream::new(
2749            self.location.clone(),
2750            HydroNode::ResolveFuturesOrdered {
2751                input: Box::new(self.ir_node.into_inner()),
2752                metadata: self
2753                    .location
2754                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2755            },
2756        )
2757    }
2758}
2759
2760impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2761where
2762    L: Location<'a> + NoTick,
2763{
2764    /// Executes the provided closure for every element in this stream.
2765    ///
2766    /// Because the closure may have side effects, the stream must have deterministic order
2767    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2768    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2769    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2770    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2771        let f = f.splice_fn1_ctx(&self.location).into();
2772        self.location
2773            .flow_state()
2774            .borrow_mut()
2775            .push_root(HydroRoot::ForEach {
2776                input: Box::new(self.ir_node.into_inner()),
2777                f,
2778                op_metadata: HydroIrOpMetadata::new(),
2779            });
2780    }
2781
2782    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2783    /// TCP socket to some other server. You should _not_ use this API for interacting with
2784    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2785    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2786    /// interaction with asynchronous sinks.
2787    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2788    where
2789        S: 'a + futures::Sink<T> + Unpin,
2790    {
2791        self.location
2792            .flow_state()
2793            .borrow_mut()
2794            .push_root(HydroRoot::DestSink {
2795                sink: sink.splice_typed_ctx(&self.location).into(),
2796                input: Box::new(self.ir_node.into_inner()),
2797                op_metadata: HydroIrOpMetadata::new(),
2798            });
2799    }
2800}
2801
2802impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2803where
2804    L: Location<'a>,
2805{
2806    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2807    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2808    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2809        Stream::new(
2810            self.location.outer().clone(),
2811            HydroNode::YieldConcat {
2812                inner: Box::new(self.ir_node.into_inner()),
2813                metadata: self
2814                    .location
2815                    .outer()
2816                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2817            },
2818        )
2819    }
2820
2821    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2822    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2823    ///
2824    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2825    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2826    /// stream's [`Tick`] context.
2827    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2828        let out_location = Atomic {
2829            tick: self.location.clone(),
2830        };
2831
2832        Stream::new(
2833            out_location.clone(),
2834            HydroNode::YieldConcat {
2835                inner: Box::new(self.ir_node.into_inner()),
2836                metadata: out_location
2837                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2838            },
2839        )
2840    }
2841
2842    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2843    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2844    /// input.
2845    ///
2846    /// This API is particularly useful for stateful computation on batches of data, such as
2847    /// maintaining an accumulated state that is up to date with the current batch.
2848    ///
2849    /// # Example
2850    /// ```rust
2851    /// # #[cfg(feature = "deploy")] {
2852    /// # use hydro_lang::prelude::*;
2853    /// # use futures::StreamExt;
2854    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2855    /// let tick = process.tick();
2856    /// # // ticks are lazy by default, forces the second tick to run
2857    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2858    /// # let batch_first_tick = process
2859    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2860    /// #  .batch(&tick, nondet!(/** test */));
2861    /// # let batch_second_tick = process
2862    /// #   .source_iter(q!(vec![5, 6, 7]))
2863    /// #   .batch(&tick, nondet!(/** test */))
2864    /// #   .defer_tick(); // appears on the second tick
2865    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2866    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2867    ///
2868    /// input.batch(&tick, nondet!(/** test */))
2869    ///     .across_ticks(|s| s.count()).all_ticks()
2870    /// # }, |mut stream| async move {
2871    /// // [4, 7]
2872    /// assert_eq!(stream.next().await.unwrap(), 4);
2873    /// assert_eq!(stream.next().await.unwrap(), 7);
2874    /// # }));
2875    /// # }
2876    /// ```
2877    pub fn across_ticks<Out: BatchAtomic>(
2878        self,
2879        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2880    ) -> Out::Batched {
2881        thunk(self.all_ticks_atomic()).batched_atomic()
2882    }
2883
2884    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2885    /// always has the elements of `self` at tick `T - 1`.
2886    ///
2887    /// At tick `0`, the output stream is empty, since there is no previous tick.
2888    ///
2889    /// This operator enables stateful iterative processing with ticks, by sending data from one
2890    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2891    ///
2892    /// # Example
2893    /// ```rust
2894    /// # #[cfg(feature = "deploy")] {
2895    /// # use hydro_lang::prelude::*;
2896    /// # use futures::StreamExt;
2897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2898    /// let tick = process.tick();
2899    /// // ticks are lazy by default, forces the second tick to run
2900    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2901    ///
2902    /// let batch_first_tick = process
2903    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2904    ///   .batch(&tick, nondet!(/** test */));
2905    /// let batch_second_tick = process
2906    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2907    ///   .batch(&tick, nondet!(/** test */))
2908    ///   .defer_tick(); // appears on the second tick
2909    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2910    ///
2911    /// changes_across_ticks.clone().filter_not_in(
2912    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2913    /// ).all_ticks()
2914    /// # }, |mut stream| async move {
2915    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2916    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2917    /// #     assert_eq!(stream.next().await.unwrap(), w);
2918    /// # }
2919    /// # }));
2920    /// # }
2921    /// ```
2922    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2923        Stream::new(
2924            self.location.clone(),
2925            HydroNode::DeferTick {
2926                input: Box::new(self.ir_node.into_inner()),
2927                metadata: self
2928                    .location
2929                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2930            },
2931        )
2932    }
2933}
2934
2935#[cfg(test)]
2936mod tests {
2937    #[cfg(feature = "deploy")]
2938    use futures::{SinkExt, StreamExt};
2939    #[cfg(feature = "deploy")]
2940    use hydro_deploy::Deployment;
2941    #[cfg(feature = "deploy")]
2942    use serde::{Deserialize, Serialize};
2943    #[cfg(feature = "deploy")]
2944    use stageleft::q;
2945
2946    #[cfg(any(feature = "deploy", feature = "sim"))]
2947    use crate::compile::builder::FlowBuilder;
2948    #[cfg(feature = "deploy")]
2949    use crate::live_collections::stream::ExactlyOnce;
2950    #[cfg(feature = "sim")]
2951    use crate::live_collections::stream::NoOrder;
2952    #[cfg(any(feature = "deploy", feature = "sim"))]
2953    use crate::live_collections::stream::TotalOrder;
2954    #[cfg(any(feature = "deploy", feature = "sim"))]
2955    use crate::location::Location;
2956    #[cfg(any(feature = "deploy", feature = "sim"))]
2957    use crate::nondet::nondet;
2958
2959    mod backtrace_chained_ops;
2960
2961    #[cfg(feature = "deploy")]
2962    struct P1 {}
2963    #[cfg(feature = "deploy")]
2964    struct P2 {}
2965
2966    #[cfg(feature = "deploy")]
2967    #[derive(Serialize, Deserialize, Debug)]
2968    struct SendOverNetwork {
2969        n: u32,
2970    }
2971
2972    #[cfg(feature = "deploy")]
2973    #[tokio::test]
2974    async fn first_ten_distributed() {
2975        let mut deployment = Deployment::new();
2976
2977        let flow = FlowBuilder::new();
2978        let first_node = flow.process::<P1>();
2979        let second_node = flow.process::<P2>();
2980        let external = flow.external::<P2>();
2981
2982        let numbers = first_node.source_iter(q!(0..10));
2983        let out_port = numbers
2984            .map(q!(|n| SendOverNetwork { n }))
2985            .send_bincode(&second_node)
2986            .send_bincode_external(&external);
2987
2988        let nodes = flow
2989            .with_process(&first_node, deployment.Localhost())
2990            .with_process(&second_node, deployment.Localhost())
2991            .with_external(&external, deployment.Localhost())
2992            .deploy(&mut deployment);
2993
2994        deployment.deploy().await.unwrap();
2995
2996        let mut external_out = nodes.connect(out_port).await;
2997
2998        deployment.start().await.unwrap();
2999
3000        for i in 0..10 {
3001            assert_eq!(external_out.next().await.unwrap().n, i);
3002        }
3003    }
3004
3005    #[cfg(feature = "deploy")]
3006    #[tokio::test]
3007    async fn first_cardinality() {
3008        let mut deployment = Deployment::new();
3009
3010        let flow = FlowBuilder::new();
3011        let node = flow.process::<()>();
3012        let external = flow.external::<()>();
3013
3014        let node_tick = node.tick();
3015        let count = node_tick
3016            .singleton(q!([1, 2, 3]))
3017            .into_stream()
3018            .flatten_ordered()
3019            .first()
3020            .into_stream()
3021            .count()
3022            .all_ticks()
3023            .send_bincode_external(&external);
3024
3025        let nodes = flow
3026            .with_process(&node, deployment.Localhost())
3027            .with_external(&external, deployment.Localhost())
3028            .deploy(&mut deployment);
3029
3030        deployment.deploy().await.unwrap();
3031
3032        let mut external_out = nodes.connect(count).await;
3033
3034        deployment.start().await.unwrap();
3035
3036        assert_eq!(external_out.next().await.unwrap(), 1);
3037    }
3038
3039    #[cfg(feature = "deploy")]
3040    #[tokio::test]
3041    async fn unbounded_reduce_remembers_state() {
3042        let mut deployment = Deployment::new();
3043
3044        let flow = FlowBuilder::new();
3045        let node = flow.process::<()>();
3046        let external = flow.external::<()>();
3047
3048        let (input_port, input) = node.source_external_bincode(&external);
3049        let out = input
3050            .reduce(q!(|acc, v| *acc += v))
3051            .sample_eager(nondet!(/** test */))
3052            .send_bincode_external(&external);
3053
3054        let nodes = flow
3055            .with_process(&node, deployment.Localhost())
3056            .with_external(&external, deployment.Localhost())
3057            .deploy(&mut deployment);
3058
3059        deployment.deploy().await.unwrap();
3060
3061        let mut external_in = nodes.connect(input_port).await;
3062        let mut external_out = nodes.connect(out).await;
3063
3064        deployment.start().await.unwrap();
3065
3066        external_in.send(1).await.unwrap();
3067        assert_eq!(external_out.next().await.unwrap(), 1);
3068
3069        external_in.send(2).await.unwrap();
3070        assert_eq!(external_out.next().await.unwrap(), 3);
3071    }
3072
3073    #[cfg(feature = "deploy")]
3074    #[tokio::test]
3075    async fn atomic_fold_replays_each_tick() {
3076        let mut deployment = Deployment::new();
3077
3078        let flow = FlowBuilder::new();
3079        let node = flow.process::<()>();
3080        let external = flow.external::<()>();
3081
3082        let (input_port, input) =
3083            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3084        let tick = node.tick();
3085
3086        let out = input
3087            .batch(&tick, nondet!(/** test */))
3088            .cross_singleton(
3089                node.source_iter(q!(vec![1, 2, 3]))
3090                    .atomic(&tick)
3091                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3092                    .snapshot_atomic(nondet!(/** test */)),
3093            )
3094            .all_ticks()
3095            .send_bincode_external(&external);
3096
3097        let nodes = flow
3098            .with_process(&node, deployment.Localhost())
3099            .with_external(&external, deployment.Localhost())
3100            .deploy(&mut deployment);
3101
3102        deployment.deploy().await.unwrap();
3103
3104        let mut external_in = nodes.connect(input_port).await;
3105        let mut external_out = nodes.connect(out).await;
3106
3107        deployment.start().await.unwrap();
3108
3109        external_in.send(1).await.unwrap();
3110        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3111
3112        external_in.send(2).await.unwrap();
3113        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3114    }
3115
3116    #[cfg(feature = "deploy")]
3117    #[tokio::test]
3118    async fn unbounded_scan_remembers_state() {
3119        let mut deployment = Deployment::new();
3120
3121        let flow = FlowBuilder::new();
3122        let node = flow.process::<()>();
3123        let external = flow.external::<()>();
3124
3125        let (input_port, input) = node.source_external_bincode(&external);
3126        let out = input
3127            .scan(
3128                q!(|| 0),
3129                q!(|acc, v| {
3130                    *acc += v;
3131                    Some(*acc)
3132                }),
3133            )
3134            .send_bincode_external(&external);
3135
3136        let nodes = flow
3137            .with_process(&node, deployment.Localhost())
3138            .with_external(&external, deployment.Localhost())
3139            .deploy(&mut deployment);
3140
3141        deployment.deploy().await.unwrap();
3142
3143        let mut external_in = nodes.connect(input_port).await;
3144        let mut external_out = nodes.connect(out).await;
3145
3146        deployment.start().await.unwrap();
3147
3148        external_in.send(1).await.unwrap();
3149        assert_eq!(external_out.next().await.unwrap(), 1);
3150
3151        external_in.send(2).await.unwrap();
3152        assert_eq!(external_out.next().await.unwrap(), 3);
3153    }
3154
3155    #[cfg(feature = "deploy")]
3156    #[tokio::test]
3157    async fn unbounded_enumerate_remembers_state() {
3158        let mut deployment = Deployment::new();
3159
3160        let flow = FlowBuilder::new();
3161        let node = flow.process::<()>();
3162        let external = flow.external::<()>();
3163
3164        let (input_port, input) = node.source_external_bincode(&external);
3165        let out = input.enumerate().send_bincode_external(&external);
3166
3167        let nodes = flow
3168            .with_process(&node, deployment.Localhost())
3169            .with_external(&external, deployment.Localhost())
3170            .deploy(&mut deployment);
3171
3172        deployment.deploy().await.unwrap();
3173
3174        let mut external_in = nodes.connect(input_port).await;
3175        let mut external_out = nodes.connect(out).await;
3176
3177        deployment.start().await.unwrap();
3178
3179        external_in.send(1).await.unwrap();
3180        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3181
3182        external_in.send(2).await.unwrap();
3183        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3184    }
3185
3186    #[cfg(feature = "deploy")]
3187    #[tokio::test]
3188    async fn unbounded_unique_remembers_state() {
3189        let mut deployment = Deployment::new();
3190
3191        let flow = FlowBuilder::new();
3192        let node = flow.process::<()>();
3193        let external = flow.external::<()>();
3194
3195        let (input_port, input) =
3196            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3197        let out = input.unique().send_bincode_external(&external);
3198
3199        let nodes = flow
3200            .with_process(&node, deployment.Localhost())
3201            .with_external(&external, deployment.Localhost())
3202            .deploy(&mut deployment);
3203
3204        deployment.deploy().await.unwrap();
3205
3206        let mut external_in = nodes.connect(input_port).await;
3207        let mut external_out = nodes.connect(out).await;
3208
3209        deployment.start().await.unwrap();
3210
3211        external_in.send(1).await.unwrap();
3212        assert_eq!(external_out.next().await.unwrap(), 1);
3213
3214        external_in.send(2).await.unwrap();
3215        assert_eq!(external_out.next().await.unwrap(), 2);
3216
3217        external_in.send(1).await.unwrap();
3218        external_in.send(3).await.unwrap();
3219        assert_eq!(external_out.next().await.unwrap(), 3);
3220    }
3221
3222    #[cfg(feature = "sim")]
3223    #[test]
3224    #[should_panic]
3225    fn sim_batch_nondet_size() {
3226        let flow = FlowBuilder::new();
3227        let node = flow.process::<()>();
3228
3229        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3230
3231        let tick = node.tick();
3232        let out_recv = input
3233            .batch(&tick, nondet!(/** test */))
3234            .count()
3235            .all_ticks()
3236            .sim_output();
3237
3238        flow.sim().exhaustive(async || {
3239            in_send.send(());
3240            in_send.send(());
3241            in_send.send(());
3242
3243            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3244        });
3245    }
3246
3247    #[cfg(feature = "sim")]
3248    #[test]
3249    fn sim_batch_preserves_order() {
3250        let flow = FlowBuilder::new();
3251        let node = flow.process::<()>();
3252
3253        let (in_send, input) = node.sim_input();
3254
3255        let tick = node.tick();
3256        let out_recv = input
3257            .batch(&tick, nondet!(/** test */))
3258            .all_ticks()
3259            .sim_output();
3260
3261        flow.sim().exhaustive(async || {
3262            in_send.send(1);
3263            in_send.send(2);
3264            in_send.send(3);
3265
3266            out_recv.assert_yields_only([1, 2, 3]).await;
3267        });
3268    }
3269
3270    #[cfg(feature = "sim")]
3271    #[test]
3272    #[should_panic]
3273    fn sim_batch_unordered_shuffles() {
3274        let flow = FlowBuilder::new();
3275        let node = flow.process::<()>();
3276
3277        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3278
3279        let tick = node.tick();
3280        let batch = input.batch(&tick, nondet!(/** test */));
3281        let out_recv = batch
3282            .clone()
3283            .min()
3284            .zip(batch.max())
3285            .all_ticks()
3286            .sim_output();
3287
3288        flow.sim().exhaustive(async || {
3289            in_send.send_many_unordered([1, 2, 3]);
3290
3291            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3292                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3293            }
3294        });
3295    }
3296
3297    #[cfg(feature = "sim")]
3298    #[test]
3299    fn sim_batch_unordered_shuffles_count() {
3300        let flow = FlowBuilder::new();
3301        let node = flow.process::<()>();
3302
3303        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3304
3305        let tick = node.tick();
3306        let batch = input.batch(&tick, nondet!(/** test */));
3307        let out_recv = batch.all_ticks().sim_output();
3308
3309        let instance_count = flow.sim().exhaustive(async || {
3310            in_send.send_many_unordered([1, 2, 3, 4]);
3311            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3312        });
3313
3314        assert_eq!(
3315            instance_count,
3316            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3317        )
3318    }
3319
3320    #[cfg(feature = "sim")]
3321    #[test]
3322    #[should_panic]
3323    fn sim_observe_order_batched() {
3324        let flow = FlowBuilder::new();
3325        let node = flow.process::<()>();
3326
3327        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3328
3329        let tick = node.tick();
3330        let batch = input.batch(&tick, nondet!(/** test */));
3331        let out_recv = batch
3332            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3333            .all_ticks()
3334            .sim_output();
3335
3336        flow.sim().exhaustive(async || {
3337            in_send.send_many_unordered([1, 2, 3, 4]);
3338            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3339        });
3340    }
3341
3342    #[cfg(feature = "sim")]
3343    #[test]
3344    fn sim_observe_order_batched_count() {
3345        let flow = FlowBuilder::new();
3346        let node = flow.process::<()>();
3347
3348        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3349
3350        let tick = node.tick();
3351        let batch = input.batch(&tick, nondet!(/** test */));
3352        let out_recv = batch
3353            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3354            .all_ticks()
3355            .sim_output();
3356
3357        let instance_count = flow.sim().exhaustive(async || {
3358            in_send.send_many_unordered([1, 2, 3, 4]);
3359            let _ = out_recv.collect::<Vec<_>>().await;
3360        });
3361
3362        assert_eq!(
3363            instance_count,
3364            192 // 4! * 2^{4 - 1}
3365        )
3366    }
3367
3368    #[cfg(feature = "sim")]
3369    #[test]
3370    fn sim_unordered_count_instance_count() {
3371        let flow = FlowBuilder::new();
3372        let node = flow.process::<()>();
3373
3374        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3375
3376        let tick = node.tick();
3377        let out_recv = input
3378            .count()
3379            .snapshot(&tick, nondet!(/** test */))
3380            .all_ticks()
3381            .sim_output();
3382
3383        let instance_count = flow.sim().exhaustive(async || {
3384            in_send.send_many_unordered([1, 2, 3, 4]);
3385            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3386        });
3387
3388        assert_eq!(
3389            instance_count,
3390            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3391        )
3392    }
3393}