Pipelines in Lea
Pipelines are first-class values in Lea, enabling composition, inspection, and reuse of data transformation chains.
Creating Pipelines
Define a pipeline with /> at the start:
let processNumbers = /> double /> addOne
Apply a pipeline by piping into it:
5 /> processNumbers -- 11
Starting with Spread Pipe
Pipelines can also start with />>> when the first operation should map over list elements:
let doubleAll = />>> double /> sum
[1, 2, 3] /> doubleAll -- 12 (2+4+6)
Pipeline Properties
let p = /> double /> addOne /> toString
p.length -- 3 (number of stages)
p.stages -- ["double", "addOne", "toString"]
p.first -- double (first stage as function)
p.last -- toString (last stage as function)
p.isEmpty() -- false
Visualization
p.visualize()
Outputs an ASCII diagram:
─── double ─── addOne ─── toString ───
Pipeline Composition
Pipelines compose naturally:
let pipeA = /> filter((x) -> x > 0)
let pipeB = /> map((x) -> x * 2)
let combined = /> pipeA /> pipeB
Pipeline Algebra
Identity and Empty
5 /> Pipeline.identity -- 5 (passes through unchanged)
5 /> Pipeline.empty -- 5 (no stages)
Creating from Functions
let p = Pipeline.from([fn1, fn2, fn3])
Stage Access
p.at(0) -- Get stage at index as callable function
p.first -- First stage
p.last -- Last stage
Manipulation (returns new pipeline)
p.prepend(fn) -- Add stage at start
p.append(fn) -- Add stage at end
p.reverse() -- Reverse stage order
p.slice(0, 2) -- Extract sub-pipeline
Set Operations (returns new pipeline)
pipeA.without(pipeB) -- Remove stages appearing in pipeB
pipeA.intersection(pipeB) -- Keep only stages common to both
pipeA.union(pipeB) -- Combine all stages (deduplicated)
pipeA.difference(pipeB) -- Stages in pipeA but not pipeB
pipeA.concat(pipeB) -- Concatenate (preserves duplicates)
Comparison
pipeA.equals(pipeB) -- Structural equality
Reversible Functions
Define both forward and reverse transformations:
let double = (x) -> x * 2
and double = (x) <- x / 2
Apply in either direction:
5 /> double -- 10 (forward: 5 * 2)
10 </ double -- 5 (reverse: 10 / 2)
-- Roundtrip preserves value
5 /> double </ double -- 5
Bidirectional Pipelines
Pipelines that work in both directions:
let transform = </> double </> addTen
5 /> transform -- 20 (forward: 5 -> 10 -> 20)
20 </ transform -- 5 (reverse: 20 -> 10 -> 5)
All stages should be reversible functions for reverse to work correctly.
Pipeline Decorators
let debugPipeline = /> double /> addOne #debug
let profiledPipeline = /> double /> addOne #profile
let loggedPipeline = /> double /> addOne #log
| Decorator | Description |
|---|---|
#log | Log pipeline input/output |
#log_verbose | Detailed stage-by-stage logging |
#memo | Cache results by input |
#time | Log total execution time |
#debug | Step-by-step execution logging |
#profile | Timing breakdown per stage |
#trace | Nested call tracing |
Reactive Pipelines
Pipelines that automatically recompute when their source changes:
maybe source = [1, 2, 3]
let reactive = source @> map(double) /> sum
reactive.value -- 12 (computed on first access)
source = [1, 2, 3, 4] -- Mutation marks reactive as dirty
reactive.value -- 20 (recomputed)
reactive.value -- 20 (cached, not recomputed)
Key behaviors:
- Lazy: Computes on
.valueaccess, not on creation - Dirty tracking: Source mutation marks reactive as dirty
- Caching: Subsequent
.valueaccesses return cached result if clean - Auto-unwrap: Reactives unwrap in expressions:
(reactive + 10) - Static optimization: Primitive sources compute immediately (no wrapper)
Examples
Data Processing Pipeline
let cleanData = />
filter((x) -> x != null) />
map((x) -> trim(x)) />
filter((x) -> length(x) > 0)
let processCSV = />
lines />
map((line) -> split(line, ",")) />
cleanData
rawCSV /> processCSV /> print
Encoding/Decoding with Bidirectional Pipelines
let encode = (s) -> s ++ "!"
and encode = (s) <- slice(s, 0, length(s) - 1)
let wrap = (s) -> "[" ++ s ++ "]"
and wrap = (s) <- slice(s, 1, length(s) - 1)
let codec = </> encode </> wrap
"hello" /> codec -- "[hello!]"
"[hello!]" </ codec -- "hello"
Reusable Transformations
let normalizeNumbers = />
filter((x) -> x > 0) />
map((x) -> round(x)) />
listSet
let statistics = />
(nums) -> {
count: length(nums),
sum: reduce(nums, 0, (a, x) -> a + x),
avg: reduce(nums, 0, (a, x) -> a + x) / length(nums)
}
[3.2, -1, 5.8, 3.2, 7.1]
/> normalizeNumbers
/> statistics
/> print
-- { count: 3, sum: 16, avg: 5.33... }