DFIR Operators
In our previous examples we made use of some of DFIR's operators. Here we document each operator in more detail. Many of these operators are based on the Rust equivalents for iterators; see the Rust documentation.
Maps:
_counter, enumerate, identity, inspect, map, resolve_futures, resolve_futures_blocking, resolve_futures_blocking_ordered, resolve_futures_ordered
Simple one-in-one-out operators.
Filters:
filter, filter_map
One-in zero-or-one-out operators.
Flattens:
flat_map, flatten
One-in multiple-out operators.
Folds:
fold, prefix, reduce, scan
Operators which accumulate elements together.
Keyed Folds:
fold_keyed, reduce_keyed
Operators which accumulate elements together by key.
Lattice Folds:
lattice_fold, lattice_reduce
Folds based on lattice-merge.
Persistent Operators:
multiset_delta, defer_signal, persist, persist_mut, persist_mut_keyed, sort, sort_by_key, state, state_by, unique
Persistent (stateful) operators.
Multi-Input Operators:
anti_join, chain, chain_first_n, cross_join, cross_join_multiset, cross_singleton, difference, join, join_fused, join_fused_lhs, join_fused_rhs, join_multiset, lattice_bimorphism, union, zip, zip_longest
Operators with multiple inputs.
Multi-Output Operators:
demux_enum, partition, tee, unzip
Operators with multiple outputs.
Sources:
initialize, null, spin, source_file, source_interval, source_iter, source_json, source_stdin, source_stream, source_stream_serde
Operators which produce output elements (and consume no inputs).
Sinks:
dest_file, dest_sink, dest_sink_serde, for_each, null
Operators which consume input elements (and produce no outputs).
Control Flow Operators:
assert, assert_eq, next_iteration, next_stratum, defer_tick, defer_tick_lazy
Operators which affect control flow/scheduling.
Compiler Fusion Operators:
_lattice_fold_batch, _lattice_join_fused_join
Operators which are necessary to implement certain optimizations and rewrite rules.
Windowing Operators:
all_once, batch, repeat_n, prefix
Operators for windowing loop inputs.
Un-Windowing Operators:
all_iterations
Operators for collecting loop outputs.
all_iterations
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> all_iterations() -> | exactly 1 | Streaming |
all_once
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> all_once() -> | exactly 1 | Streaming |
Given a bounded input stream, emits the entire stream in the first loop iteration.
Never causes additional loop iterations.
anti_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]anti_join() -> | exactly 1 | Blocking |
Input port names:
pos(streaming),neg(blocking)
2 input streams the first of type (K, T), the second of type K, with output type (K, T)
For a given tick, computes the anti-join of the items in the input
streams, returning items in the pos input that do not have matching keys
in the neg input. NOTE this uses multiset semantics only on the positive side,
so duplicated positive inputs will appear in the output either 0 times (if matched in neg)
or as many times as they appear in the input (if not matched in neg)
source_iter(vec![("cat", 2), ("cat", 2), ("elephant", 3), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join() -> assert_eq([("elephant", 3), ("elephant", 3)]);
assert
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> assert(A) | at least 0 and at most 1 | Streaming |
1 input stream, 1 optional output stream Arguments: a predicate function that will be applied to each item in the stream
If the predicate returns false for any input item then the operator will panic at runtime.
source_iter([1, 2, 3])
-> assert(|x| *x > 0)
-> assert_eq([1, 2, 3]);
assert_eq
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> assert_eq(A) | at least 0 and at most 1 | Streaming |
1 input stream, 1 optional output stream Arguments: A Vector, Slice, or Array containing objects that will be compared to the input stream.
The input stream will be compared with the provided argument, element by element. If any elements do not match, assert_eq will panic.
If the input stream produces more elements than are in the provided argument, assert_eq will panic.
The input stream is passed through assert_eq unchanged to the output stream.
assert_eq is mainly useful for testing and documenting the behavior of dfir code inline.
assert_eq will remember the stream position across ticks, see example.
unioned = union();
source_iter([1]) -> assert_eq([1]) -> unioned;
source_iter([2]) -> defer_tick() -> assert_eq([2]) -> unioned;
unioned -> assert_eq([1, 2]);
batch
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> batch() -> | exactly 1 | Streaming |
Given an unbounded input stream, emits values arbitrarily split into batches over multiple iterations in the same order.
Will cause additional loop iterations as long as new values arrive.
chain
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]chain() -> | exactly 1 | Streaming |
2 input streams of the same type, 1 output stream of the same type
Chains together a pair of streams, with all the elements of the first emitted before the second.
Since chain has multiple input streams, it needs to be assigned to
a variable to reference its multiple input ports across statements.
source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]);
chain_first_n
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| at least 2 | -> [<input_port>]chain_first_n(A) -> | exactly 1 | Streaming |
2 input streams of the same type, 1 output stream of the same type
Chains together a pair of streams, with all the elements of the first emitted before the second,
emitting up to N elements where N is passed as an argument.
Since chain_first_n has multiple input streams, it needs to be assigned to
a variable to reference its multiple input ports across statements.
source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain_first_n(3)
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY"]);
cross_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]cross_join() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
2 input streams of type S and T, 1 output stream of type (S, T)
Forms the cross-join (Cartesian product) of the items in the input streams, returning all tupled pairs.
source_iter(vec!["happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat"]) -> [1]my_join;
my_join = cross_join() -> assert_eq([("happy", "dog"), ("sad", "dog"), ("happy", "cat"), ("sad", "cat")]);
cross_join can be provided with one or two generic lifetime persistence arguments
in the same way as join, see join's documentation for more info.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<&str>();
let mut flow = dfir_rs::dfir_syntax! {
my_join = cross_join::<'tick>();
source_iter(["hello", "bye"]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join -> for_each(|(s, t)| println!("({}, {})", s, t));
};
input_send.send("oakland").unwrap();
flow.run_tick();
input_send.send("san francisco").unwrap();
flow.run_tick();
Prints only "(hello, oakland)" and "(bye, oakland)". The source_iter is only included in
the first tick, then forgotten, so when "san francisco" arrives on input [1] in the second tick,
there is nothing for it to match with from input [0], and therefore it does appear in the output.
cross_join_multiset
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]cross_join_multiset() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
2 input streams of type S and T, 1 output stream of type (S, T)
Forms the multiset cross-join (Cartesian product) of the (possibly duplicated) items in the input streams, returning all tupled pairs regardless of duplicates.
source_iter(vec!["happy", "happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat", "cat"]) -> [1]my_join;
my_join = cross_join_multiset() -> sort() -> assert_eq([
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "dog"),
("happy", "dog"),
("sad", "cat"),
("sad", "cat"),
("sad", "dog"), ]);
cross_singleton
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]cross_singleton() -> | exactly 1 | Blocking |
Input port names:
input(streaming),single(blocking)
2 input streams, 1 output stream, no arguments.
Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting ignoring everything after the first element. This operator blocks on the singleton input, and then joins it with all the elements in the other stream if an element is present. This operator is useful when a singleton input must be used to transform elements of a stream, since unlike cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional branches, since the operator short circuits if the singleton input produces no values.
There are two inputs to cross_singleton, they are input and single.
input is the input data flow, and single is the singleton input.
join = cross_singleton();
source_iter([1, 2, 3]) -> [input]join;
source_iter([0]) -> [single]join;
join -> assert_eq([(1, 0), (2, 0), (3, 0)]);
defer_signal
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]defer_signal() -> | exactly 1 | Blocking |
Input port names:
input(blocking),signal(blocking)
2 input streams, 1 output stream, no arguments.
Defers streaming input and releases it downstream when a signal is delivered. The order of input is preserved. This allows for buffering data and delivering it at a later, chosen, tick.
There are two inputs to defer_signal, they are input and signal.
input is the input data flow. Data that is delivered on this input is collected in order inside of the defer_signal operator.
When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.
gate = defer_signal();
source_iter([1, 2, 3]) -> [input]gate;
source_iter([()]) -> [signal]gate;
gate -> assert_eq([1, 2, 3]);
defer_tick
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> defer_tick() -> | exactly 1 | Blocking |
Buffers all input items and releases them in the next tick.
the state of the current tick. For example,
See the book discussion on time for details on ticks.
A tick may be divided into multiple strata; see the next_stratum()
operator.
defer_tick is sometimes needed to separate conflicting data across time,
in order to preserve invariants. Consider the following example, which implements
a flip-flop -- the invariant is that it emit one of true or false in a given tick
(but never both!)
pub fn main() {
let mut df = dfir_rs::dfir_syntax! {
source_iter(vec!(true))
-> state;
state = union()
-> assert(|x| if context.current_tick().0 % 2 == 0 { *x == true } else { *x == false })
-> map(|x| !x)
-> defer_tick()
-> state;
};
for i in 1..100 {
println!("tick {}", i);
df.run_tick();
}
}
defer_tick can also be handy for comparing stream content across ticks.
In the example below defer_tick() is used alongside difference() to
filter out any items that arrive from inp in the current tick which match
an item from inp in the previous
tick.
// Outputs 1 2 3 4 5 6 (on separate lines).
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut flow = dfir_rs::dfir_syntax! {
inp = source_stream(input_recv) -> tee();
inp -> [pos]diff;
inp -> defer_tick() -> [neg]diff;
diff = difference() -> for_each(|x| println!("{}", x));
};
for x in [1, 2, 3, 4] {
input_send.send(x).unwrap();
}
flow.run_tick();
for x in [3, 4, 5, 6] {
input_send.send(x).unwrap();
}
flow.run_tick();
You can also supply a type parameter defer_tick::<MyType>() to specify what items flow
through the the pipeline. This can be useful for helping the compiler infer types.
defer_tick_lazy
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> defer_tick_lazy() -> | exactly 1 | Blocking |
See defer_tick
This operator is identical to defer_tick except that it does not eagerly cause a new tick to be scheduled.
demux_enum
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> demux_enum() | any number of | Streaming |
Output port names: Variadic, as specified in arguments.
dest_file
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_file(A, B) | exactly 0 | Streaming |
dest_sink
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_sink(A) | exactly 0 | Streaming |
dest_sink_serde
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_sink_serde(A) | exactly 0 | Streaming |
difference
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]difference() -> | exactly 1 | Blocking |
Input port names:
pos(streaming),neg(blocking)
enumerate
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> enumerate() -> | exactly 1 | Streaming |
filter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> filter(A) -> | exactly 1 | Streaming |
filter_map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> filter_map(A) -> | exactly 1 | Streaming |
flat_map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> flat_map(A) -> | exactly 1 | Streaming |
flatten
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> flatten() -> | exactly 1 | Streaming |
fold
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> fold(A, B) | at least 0 and at most 1 | Blocking |
fold_keyed
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> fold_keyed(A, B) -> | exactly 1 | Blocking |
for_each
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> for_each(A) | exactly 0 | Streaming |
identity
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> identity() -> | exactly 1 | Streaming |
initialize
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | initialize() -> | exactly 1 | Streaming |
inspect
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> inspect(A) | at least 0 and at most 1 | Streaming |
join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
join_fused
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join_fused(A, B) -> | exactly 1 | Blocking |
Input port names:
0(blocking),1(blocking)
join_fused_lhs
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join_fused_lhs(A) -> | exactly 1 | Blocking |
Input port names:
0(blocking),1(streaming)
join_fused_rhs
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join_fused_rhs(A) -> | exactly 1 | Blocking |
Input port names:
0(streaming),1(blocking)
join_multiset
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join_multiset() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
lattice_bimorphism
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]lattice_bimorphism(A, B, C) -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
lattice_fold
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> lattice_fold(A) -> | exactly 1 | Blocking |
lattice_reduce
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> lattice_reduce() -> | exactly 1 | Blocking |
map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> map(A) -> | exactly 1 | Streaming |
multiset_delta
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> multiset_delta() -> | exactly 1 | Streaming |
next_iteration
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> next_iteration() -> | exactly 1 | Streaming |
next_stratum
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> next_stratum() -> | exactly 1 | Blocking |
null
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| at least 0 and at most 1 | null() | at least 0 and at most 1 | Streaming |
partition
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> partition(A)[<output_port>] -> | at least 2 | Streaming |
Output port names: Variadic, as specified in arguments.
persist
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> persist() -> | exactly 1 | Streaming |
persist_mut
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> persist_mut() -> | exactly 1 | Blocking |
persist_mut_keyed
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> persist_mut_keyed() -> | exactly 1 | Blocking |
prefix
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> prefix() | at least 0 and at most 1 | Streaming |
reduce
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> reduce(A) | at least 0 and at most 1 | Blocking |
reduce_keyed
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> reduce_keyed(A) -> | exactly 1 | Blocking |
repeat_n
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> repeat_n(A) -> | exactly 1 | Streaming |
resolve_futures
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> resolve_futures() -> | exactly 1 | Streaming |
resolve_futures_blocking
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> resolve_futures_blocking() -> | exactly 1 | Streaming |
resolve_futures_blocking_ordered
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> resolve_futures_blocking_ordered() -> | exactly 1 | Streaming |
resolve_futures_ordered
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> resolve_futures_ordered() -> | exactly 1 | Streaming |
scan
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> scan(A, B) -> | exactly 1 | Streaming |
sort
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> sort() -> | exactly 1 | Blocking |
sort_by_key
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> sort_by_key(A) -> | exactly 1 | Blocking |
source_file
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_file(A) -> | exactly 1 | Streaming |
source_interval
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_interval(A) -> | exactly 1 | Streaming |
source_iter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_iter(A) -> | exactly 1 | Streaming |
source_json
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_json(A) -> | exactly 1 | Streaming |
source_stdin
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stdin() -> | exactly 1 | Streaming |
source_stream
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stream(A) -> | exactly 1 | Streaming |
source_stream_serde
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stream_serde(A) -> | exactly 1 | Streaming |
spin
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | spin() -> | exactly 1 | Streaming |
state
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> state() | at least 0 and at most 1 | Streaming |
state_by
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> state_by(A, B) | at least 0 and at most 1 | Streaming |
tee
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> tee()[<output_port>] -> | at least 2 | Streaming |
union
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| at least 2 | -> [<input_port>]union() -> | exactly 1 | Streaming |
unique
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> unique() -> | exactly 1 | Streaming |
unzip
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> unzip()[<output_port>] -> | exactly 2 | Streaming |
Output port names:
0,1
zip
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]zip() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
zip_longest
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]zip_longest() -> | exactly 1 | Blocking |
Input port names:
0(blocking),1(blocking)
_counter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> _counter(A, B) | at least 0 and at most 1 | Streaming |
Arguments: A
tagstring and aDurationfor how long to wait between printing. A third optional parameter controls the prefix used for logging (defaults to "_counter").
Counts the number of items passing through and prints to stdout whenever the stream trigger activates.
source_stream(dfir_rs::util::iter_batches_stream(0..=100_000, 1))
-> _counter("nums", std::time::Duration::from_millis(100));
stdout:
_counter(nums): 1
_counter(nums): 6202
_counter(nums): 12540
_counter(nums): 18876
_counter(nums): 25218
_counter(nums): 31557
_counter(nums): 37893
_counter(nums): 44220
_counter(nums): 50576
_counter(nums): 56909
_counter(nums): 63181
_counter(nums): 69549
_counter(nums): 75914
_counter(nums): 82263
_counter(nums): 88638
_counter(nums): 94980
_lattice_fold_batch
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]_lattice_fold_batch() -> | exactly 1 | Blocking |
Input port names:
input(blocking),signal(blocking)
2 input streams, 1 output stream, no arguments.
Batches streaming input and releases it downstream when a signal is delivered. This allows for buffering data and delivering it later while also folding it into a single lattice data structure.
This operator is similar to defer_signal in that it batches input and releases it when a signal is given. It is also similar to lattice_fold in that it folds the input into a single lattice.
So, _lattice_fold_batch is a combination of both defer_signal and lattice_fold. This operator is useful when trying to combine a sequence of defer_signal and lattice_fold operators without unnecessary memory consumption.
There are two inputs to _lattice_fold_batch, they are input and signal.
input is the input data flow. Data that is delivered on this input is collected in order inside of the _lattice_fold_batch operator.
When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.
use lattices::set_union::SetUnionHashSet;
use lattices::set_union::SetUnionSingletonSet;
source_iter([1, 2, 3])
-> map(SetUnionSingletonSet::new_from)
-> [input]batcher;
source_iter([()])
-> [signal]batcher;
batcher = _lattice_fold_batch::<SetUnionHashSet<usize>>()
-> assert_eq([SetUnionHashSet::new_from([1, 2, 3])]);
_lattice_join_fused_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]_lattice_join_fused_join() -> | exactly 1 | Blocking |
Input port names:
0(blocking),1(blocking)
2 input streams of type
(K, V1)and(K, V2), 1 output stream of type(K, (V1', V2'))whereV1,V2,V1',V2'are lattice types
Performs a fold_keyed with lattice-merge aggregate function on each input and then forms the
equijoin of the resulting key/value pairs in the input streams by their first (key) attribute.
Unlike join, the result is not a stream of tuples, it's a stream of MapUnionSingletonMap
lattices. You can (non-monotonically) "reveal" these as tuples if desired via map; see the examples below.
You must specify the the accumulating lattice types, they cannot be inferred. The first type argument corresponds to the [0] input of the join, and the second to the [1] input.
Type arguments are specified in dfir using the rust turbofish syntax ::<>, for example _lattice_join_fused_join::<Min<_>, Max<_>>()
The accumulating lattice type is not necessarily the same type as the input, see the below example involving SetUnion for such a case.
Like join, _lattice_join_fused_join can also be provided with one or two generic lifetime persistence arguments, either
'tick or 'static, to specify how join data persists. With 'tick, pairs will only be
joined with corresponding pairs within the same tick. With 'static, pairs will be remembered
across ticks and will be joined with pairs arriving in later ticks. When not explicitly
specified persistence defaults to `tick.
Like join, when two persistence arguments are supplied the first maps to port 0 and the second maps to
port 1.
When a single persistence argument is supplied, it is applied to both input ports.
When no persistence arguments are applied it defaults to 'tick for both.
It is important to specify all persistence arguments before any type arguments, otherwise the persistence arguments will be ignored.
The syntax is as follows:
_lattice_join_fused_join::<MaxRepr<usize>, MaxRepr<usize>>(); // Or
_lattice_join_fused_join::<'static, MaxRepr<usize>, MaxRepr<usize>>();
_lattice_join_fused_join::<'tick, MaxRepr<usize>, MaxRepr<usize>>();
_lattice_join_fused_join::<'static, 'tick, MaxRepr<usize>, MaxRepr<usize>>();
_lattice_join_fused_join::<'tick, 'static, MaxRepr<usize>, MaxRepr<usize>>();
// etc.
Examples
use dfir_rs::lattices::Min;
use dfir_rs::lattices::Max;
source_iter([("key", Min::new(1)), ("key", Min::new(2))]) -> [0]my_join;
source_iter([("key", Max::new(1)), ("key", Max::new(2))]) -> [1]my_join;
my_join = _lattice_join_fused_join::<'tick, Min<usize>, Max<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (Min::new(1), Max::new(2)))]);
use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;
source_iter([("key", SetUnionSingletonSet::new_from(0)), ("key", SetUnionSingletonSet::new_from(1))]) -> [0]my_join;
source_iter([("key", SetUnionHashSet::new_from([0])), ("key", SetUnionHashSet::new_from([1]))]) -> [1]my_join;
my_join = _lattice_join_fused_join::<'tick, SetUnionHashSet<usize>, SetUnionHashSet<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (SetUnionHashSet::new_from([0, 1]), SetUnionHashSet::new_from([0, 1])))]);