Go library for working with slices, streams, and data pipelines. Type-safe generics, lazy evaluation, parallel processing, configurable error handling, context-aware cancellation.
Use the whole pipeline or pick the parts you need — standalone slice functions, streaming I/O, hooks, parallel workers. Each piece works independently.
GoSplice started as a small utility library — Map, Filter, Reduce for Go slices with generics. The kind of thing you write once in every project and then copy-paste between repos.
Then it grew. Not because there was a grand plan, but because real projects kept needing the next piece: "I need to chain these operations lazily" → pipelines. "I need to process a channel with timeout" → context-aware sources. "Some records fail and I want to skip them" → error hooks. "API calls need rate limiting" → RateLimit stage. Each addition solved a concrete problem that came up in production code.
The philosophy stayed the same throughout: don't fight Go, extend it. No new DSL, no reflection where generics work, no framework that takes over your main(). Every function should feel like it could have been in the standard library — if the standard library shipped with generics from day one.
Performance matters. GoSplice takes the same approach as hand-written for loops: direct iteration over slices without intermediate allocations when possible, fast paths for common patterns (sliceSource → Collect skips the iterator protocol entirely), amortized context checks to avoid per-element channel operations. The benchmarks are in the repo — run go test -bench=. -benchmem and compare with your hand-rolled loops. The gap is small enough that the readability win is worth it.
Where GoSplice helps:
- Backend services — process API responses, transform data between layers, validate and enrich records with error tracking
- ETL and data pipelines — read CSV, filter, transform, batch-insert into a database, all with timeout and rate limiting
- CLI tools — quick scripts that process files, aggregate data, produce reports without pulling in heavyweight frameworks
- Anywhere you'd write a
forloop with an accumulator — GoSplice gives you the same loop with composability, hooks, and parallel execution bolted on
It is not a replacement for Apache Beam or Spark. It is a replacement for the 200-line processRecords() function that everyone writes differently in every project.
Requires Go 1.23+.
go get github.com/lacolle87/gospliceimport gs "github.com/lacolle87/gosplice"GoSplice is not an all-or-nothing framework. You can use it at three levels:
Level 1 — Slice functions. Drop-in utilities for everyday slice work. No pipeline, no setup.
doubled := gs.Map(nums, func(n int) int { return n * 2 })
evens := gs.Filter(users, func(u User) bool { return u.Active })
total := gs.Reduce(orders, 0.0, func(sum float64, o Order) float64 {
return sum + o.Price
})Level 2 — Pipelines. Chain operations lazily. Nothing executes until a terminal (Collect, Reduce, ForEach) is called.
names := gs.PipeMap(
gs.FromSlice(users).
Filter(func(u User) bool { return u.Active }).
Filter(func(u User) bool { return u.Score >= 70 }),
func(u User) string { return u.Name },
).Collect()Level 3 — Pipelines with hooks, error handling, parallelism, context, CSV, rate limiting, and I/O. For ETL workflows, data processing jobs, streaming.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
gs.FromChannelCtx(ctx, recordsCh).
RateLimit(gs.RateLimitConfig{Rate: 100, Burst: 20}).
WithErrorHandler(gs.RetryHandler[Record](3, 100*time.Millisecond)).
WithCompletionHook(func() { log.Println("done") })
// ...process and write to output- CSV — streaming read/write with three access levels:
Row(pandas-stylerow.Get("name"),row.GetFloat("price")), functional mapper (FromCSVFunc), and struct tags (FromCSV[T]). Two matching sinks:ToCSVandToCSVStruct. Custom delimiters, header handling, variable-length rows. - Rate limiting —
RateLimit(cfg)as a lazy pipeline stage. Token bucket, context-aware, composable with Filter/PipeMap/parallel. Use for API rate limits, DB throttling, backpressure. - Statistics —
MeanBy,VarianceBy,StdDevBy,MedianBy,PercentileBy,DescribeBy,CorrelationByfor pipelines. Standalone slice versions:Mean,Variance,StdDev,Median,Percentile,Describe,Correlation,Histogram. Single-pass Welford's algorithm where possible. - Numeric constraint —
Numericinterface for stats functions (int/uint/float types, no string).
Standalone functions that work on regular Go slices. No pipeline required.
squared := gs.Map([]int{1, 2, 3}, func(n int) int { return n * n })
// [1, 4, 9]
// Zero allocations — modifies the slice in place
gs.MapInPlace(prices, func(p float64) float64 { return p * 1.1 })Two type parameters — accumulator type can differ from element type.
total := gs.Reduce(nums, 0, func(acc, n int) int { return acc + n })
index := gs.Reduce(users, make(map[int]User), func(m map[int]User, u User) map[int]User {
m[u.ID] = u
return m
})adults := gs.Filter(people, func(p Person) bool { return p.Age >= 18 })
// Zero allocations — compacts the slice in place (modifies the original)
active := gs.FilterInPlace(users, func(u User) bool { return u.Active })gs.Some(nums, func(n int) bool { return n > 100 })
gs.Every(nums, func(n int) bool { return n > 0 })
gs.Includes(tags, "urgent")
v, ok := gs.Find(users, func(u User) bool { return u.Email == email })
v, ok := gs.FindLast(logs, func(l Log) bool { return l.Level == "ERROR" })
idx := gs.FindIndex(items, func(i Item) bool { return i.ID == targetID })
idx := gs.IndexOf(names, "Alice")
idx := gs.LastIndexOf(tags, "retry")gs.Reverse([]int{1, 2, 3}) // [3, 2, 1] — new slice
gs.ReverseInPlace(slice) // modifies in place, zero alloc
gs.Unique([]int{1, 2, 2, 3, 1}) // [1, 2, 3] — preserves order
gs.Flat([][]int{{1, 2}, {3, 4}}) // [1, 2, 3, 4]
gs.FlatMap(nums, func(n int) []int { return []int{n, -n} })
gs.Chunk([]int{1,2,3,4,5}, 2) // [[1,2], [3,4], [5]]
gs.Remove(slice, []int{2, 4}) // removes specific values
gs.Count(nums, func(n int) bool { return n > 0 })
gs.Zip(names, scores) // pairs of (name, score)Lazy, composable chains of operations. Nothing runs until you call a terminal.
// Nothing happens here — just building a description
p := gs.FromSlice(data).
Filter(func(n int) bool { return n > 0 }).
Take(10)
// NOW it runs
result := p.Collect()| Function | Description | Use when |
|---|---|---|
FromSlice(data) |
In-memory slice | You have all data upfront |
FromChannel(ch) |
Go channel | Streaming from a goroutine producer |
FromChannelCtx(ctx, ch) |
Context-aware channel | Streaming with cancellation support |
FromReader(r) |
io.Reader, line by line |
Reading files, HTTP bodies |
FromFunc(fn) |
Custom generator func() (T, bool) |
Computed/infinite sequences |
FromRange(start, end) |
Integer range [start, end) |
Numeric sequences |
FromCSVRows(r, cfg) |
CSV → Row (pandas-style) |
CSV exploration, untyped access |
FromCSVFunc(r, cfg, fn) |
CSV → T via mapper function |
CSV with explicit parsing |
FromCSV[T](r, cfg) |
CSV → T via struct tags |
CSV with automatic field mapping |
| Operation | Description |
|---|---|
Filter(fn) |
Keep elements where fn returns true |
Take(n) |
Stop after first n elements |
Skip(n) |
Discard first n elements |
Peek(fn) |
Call fn on each element, pass through unchanged |
RateLimit(cfg) |
Token bucket rate limiter — throttle element throughput |
| Function | Description |
|---|---|
PipeMap(p, fn) |
Transform T → U |
PipeMapErr(p, fn) |
Transform T → (U, error) with error handling |
PipeFlatMap(p, fn) |
Transform T → []U, flatten results |
PipeDistinct(p) |
Remove duplicates (comparable types) |
PipeChunk(p, size) |
Group into fixed-size []T batches |
PipeWindow(p, size, step) |
Sliding window (step=1 → full overlap, step=size → same as chunk) |
PipeReduce(p, init, fn) |
Cross-type reduce T → U |
| Terminal | Returns |
|---|---|
Collect() |
[]T |
CollectTo(buf) |
[]T (reuse allocation) |
ForEach(fn) |
— |
Count() |
int |
First() |
(T, bool) |
Reduce(init, fn) |
T |
Any(pred) |
bool (short-circuits) |
All(pred) |
bool (short-circuits) |
| Function | Returns |
|---|---|
GroupBy(p, keyFn) |
map[K][]T |
CountBy(p, keyFn) |
map[K]int |
SumBy(p, fn) |
N |
MaxBy(p, fn) |
(T, bool) |
MinBy(p, fn) |
(T, bool) |
Partition(p, pred) |
(matched, unmatched []T) |
| Function | Description |
|---|---|
MeanBy(p, fn) |
Arithmetic mean (single-pass) |
VarianceBy(p, fn) |
Population variance (Welford's algorithm) |
StdDevBy(p, fn) |
Population standard deviation |
MedianBy(p, fn) |
Median (collects + sorts) |
PercentileBy(p, pct, fn) |
Arbitrary percentile with linear interpolation |
DescribeBy(p, fn) |
Full summary: count, mean, stddev, min, Q1, median, Q3, max |
CorrelationBy(p, fnX, fnY) |
Pearson correlation (single-pass) |
Standalone slice versions: Mean, Variance, StdDev, Median, Percentile, Describe, Correlation, Histogram.
| Function | Description |
|---|---|
ToChannel(ch) |
Write elements to a channel |
ToWriter(p, w, encode) |
Write to io.Writer as bytes |
ToWriterString(p, w, format) |
Write to io.Writer as strings |
ToCSV(p, w, cfg, header, fn) |
Write CSV with explicit formatter |
ToCSVStruct[T](p, w, cfg) |
Write CSV via struct tags |
p := gs.FromChannel(tickStream).
WithTimeout(5 * time.Second).
WithTimeoutHook(func(d time.Duration) {
log.Printf("pipeline timed out after %v", d)
})
results := p.Collect()
if errors.Is(p.Err(), context.DeadlineExceeded) {
// partial results are still available
}If you need both an external context and a timeout, call WithContext first, then WithTimeout. Context propagates automatically through all pipeline stages.
When ctx is nil (default), the pipeline takes the exact same code paths as before context support — zero overhead.
type ErrorAction int
const (
Skip ErrorAction // drop element, continue
Retry // re-run the operation
Abort // stop the pipeline
)pipeline.WithErrorHandler(func(err error, elem Record, attempt int) gs.ErrorAction {
if attempt >= 3 {
return gs.Skip
}
time.Sleep(time.Duration(attempt) * 100 * time.Millisecond)
return gs.Retry
})pipeline.WithErrorHandler(gs.RetryHandler[Record](3, 100*time.Millisecond))
pipeline.WithErrorHandler(gs.RetryThenAbort[Record](3, 50*time.Millisecond))
pipeline.WithErrorHandler(gs.SkipOnError[Record]())
pipeline.WithErrorHandler(gs.AbortOnError[Record]())var elemCount atomic.Int64
var errCount atomic.Int64
pipeline.
WithElementHook(gs.CountElements[Record](&elemCount)).
WithErrorHook(gs.CountErrors[Record](&errCount)).
WithErrorHook(gs.CollectErrors[Record](&errs)).
WithBatchHook(gs.CountBatches[Record](&batchCount)).
WithCompletionHook(func() { log.Println("done") })When both an error handler and error hooks are set, the handler takes precedence.
Order is preserved in all parallel operations.
| Function | Memory model |
|---|---|
PipeMapParallel(p, workers, fn) |
Drains source into a slice, splits across workers |
PipeFilterParallel(p, workers, fn) |
Same — drain first, parallel predicate |
PipeMapParallelErr(p, workers, fn) |
Same — drain first, parallel with errors |
PipeMapParallelStream(p, workers, buf, fn) |
Streaming — bounded memory, reads on the fly |
For unbounded or very large sources, use PipeMapParallelStream.
batches := gs.PipeBatch(pipeline, gs.BatchConfig{Size: 100}).Collect()
// With timeout — emit partial batch if no new element arrives
batches := gs.PipeBatch(channelPipeline, gs.BatchConfig{
Size: 100,
MaxWait: 500 * time.Millisecond,
})Three levels of access, same "use what you need" philosophy.
No struct definitions needed. Great for exploration, scripts, and files with unknown schemas.
rows := gs.FromCSVRows(file, gs.CSVConfig{Header: true}).
Filter(func(r gs.Row) bool { return r.Get("status") == "active" }).
Filter(func(r gs.Row) bool {
salary, _ := r.GetFloat("salary")
return salary > 50000
}).
Collect()
name := rows[0].Get("name") // by column name
first := rows[0].Index(0) // by position
age, err := rows[0].GetInt("age") // typed parsing
m := rows[0].AsMap() // map[string]stringRow methods: Get, Index, GetInt, GetFloat, GetBool, Has, Len, Fields, AsMap, Columns. Header column index is built once and shared across all rows.
Full control over parsing. No reflection, no struct tags.
orders := gs.FromCSVFunc(file, gs.CSVConfig{Header: true}, func(row []string) (Order, error) {
amount, err := strconv.ParseFloat(row[1], 64)
if err != nil {
return Order{}, err
}
return Order{ID: row[0], Amount: amount}, nil
}).Collect()Automatic mapping via csv:"name" tags. Reflection runs once at startup to build a field table — per-row decoding is a fast-path switch, no further reflection.
type Order struct {
ID string `csv:"order_id"`
Amount float64 `csv:"amount"`
Status string `csv:"status"`
}
orders := gs.FromCSV[Order](file, gs.CSVConfig{Header: true}).
Filter(func(o Order) bool { return o.Status == "completed" }).
Collect()// Functional — explicit formatter
gs.ToCSV(pipeline, writer, gs.CSVConfig{Header: true},
[]string{"id", "amount", "status"},
func(o Order) []string {
return []string{o.ID, fmt.Sprintf("%.2f", o.Amount), o.Status}
},
)
// Struct tags
gs.ToCSVStruct[Order](pipeline, writer, gs.CSVConfig{Header: true})Comma — field delimiter (default ,). Comment — skip lines starting with this rune. Header — first row is column names. LazyQuotes, TrimLeadingSpace — passed through to encoding/csv. All three source levels and both sinks share the same config.
Token bucket rate limiter as a lazy pipeline stage. Sits in the chain like Filter or Take — nothing happens until a terminal is called.
// 100 requests/second, burst of 20
results := gs.PipeMapErr(
gs.FromSlice(urls).
RateLimit(gs.RateLimitConfig{Rate: 100, Burst: 20}),
fetchURL,
).Collect()Rate — elements per Interval. Interval — time window (default 1s). Burst — bucket capacity, how many elements pass immediately before the sustained rate kicks in (defaults to Rate).
Rate limiter respects pipeline context. When used with WithTimeout, the wait is cancelled on deadline:
results := gs.FromSlice(urls).
WithTimeout(5 * time.Minute).
RateLimit(gs.RateLimitConfig{Rate: 50}).
Collect() // stops on timeout, partial results availableFor explicit context control outside the pipeline (e.g. inside parallel operations):
gs.RateLimitCtx(pipeline, gs.RateLimitConfig{Rate: 200}, ctx)- External API rate limits — avoid 429 responses
- Database/queue write throttling — don't saturate connection pools
- Parallel workers with a shared global limit —
RateLimitbeforePipeMapParallelStream - Backpressure on fast producers (Kafka, WebSocket) — cap processing rate, buffer in channel
orders := []Order{
{1, "Alice", 150.0, "completed"},
{2, "Bob", 0, "failed"},
{3, "Charlie", 320.0, "completed"},
{4, "Diana", 89.0, "completed"},
{5, "Eve", 450.0, "pending"},
}
var processed atomic.Int64
results := gs.PipeMap(
gs.FromSlice(orders).
WithElementHook(gs.CountElements[Order](&processed)).
Filter(func(o Order) bool { return o.Status == "completed" }).
Filter(func(o Order) bool { return o.Amount > 100 }),
func(o Order) string {
return fmt.Sprintf("%s: $%.2f", o.Customer, o.Amount)
},
).Collect()
// ["Alice: $150.00", "Charlie: $320.00"]
byStatus := gs.CountBy(gs.FromSlice(orders), func(o Order) string { return o.Status })
// map[completed:3 failed:1 pending:1]Five working examples in examples/.
Streaming ETL: Star Wars API → examples/swapi/
Streams characters from a paginated HTTP API with context timeout, collects through a pipeline with progress hooks, then runs analytics.
Uses: FromChannelCtx, WithContext, Err, CountElements, hooks, Collect, Reduce, CountBy, MaxBy, GroupBy, Map, Filter, PipeMap, ToWriterString.
Market data: tick stream → candles → indicators → examples/quotes/
Generates price ticks with context timeout, batches into 1-minute candles, computes technical indicators (SMA, EMA, RSI, Bollinger Bands, VWAP).
Uses: FromChannelCtx, PipeBatch, PipeMap, PipeWindow, PipeReduce, CountElements, SumBy, MaxBy, MinBy, Filter, ToWriterString.
Data analytics: real estate reports → examples/realestate/
Generates 500 property listings with bad data, runs validation + labeling pipeline with error tracking, produces market report.
Uses: FromSlice, PipeMapErr, CollectErrors, CountElements, Filter, Map, FlatMap, Some, Unique, Reduce, SumBy, MaxBy, MinBy, CountBy, GroupBy, Partition, ToWriterString.
Hooks showcase → examples/hooks/
10 scenarios: observability hooks, error handlers, retry with backoff, batch hooks, hook composition, handler vs hook precedence, context timeout with hook.
Uses: CountElements, CountErrors, CollectErrors, LogErrorsTo, CountBatches, WithElementHook, WithErrorHook, WithErrorHandler, WithCompletionHook, WithMaxRetries, WithTimeout, WithTimeoutHook, Err, SkipOnError, AbortOnError, RetryHandler, RetryThenAbort, PipeMapErr, PipeBatch.
CSV pipeline → examples/csv/
Same employee data processed two ways: functional (FromCSVFunc → ToCSV) and struct tags (FromCSV → ToCSVStruct). Both approaches produce identical output. Includes department analytics with GroupBy, SumBy, MaxBy, CountBy.
Uses: FromCSVFunc, FromCSV, ToCSV, ToCSVStruct, CSVConfig, Filter, PipeMap, WithElementHook, CountElements, GroupBy, SumBy, MaxBy, CountBy, ToWriterString.
go test -v ./... # all tests
go test -bench=. -benchmem # benchmarks with allocations
go test -race ./... # race condition detectiongosplice/
├── source.go Source interface, adapters (FromSlice, FromChannel, FromChannelCtx, FromReader, FromFunc, FromRange)
├── stage.go Pipeline stage types (filter, map, flatMap, chunk, window, distinct...)
├── pipeline.go Pipeline[T], WithContext, WithTimeout, Err, chainable operations, terminals
├── iter.go Core iteration primitives (drain, fold, foldWhile) with ctx-aware branches
├── transform.go Type-changing functions (PipeMap, PipeFlatMap, PipeReduce...)
├── aggregate.go Aggregations (GroupBy, CountBy, SumBy, MaxBy, MinBy, Partition)
├── stats.go Statistics (MeanBy, VarianceBy, MedianBy, PercentileBy, DescribeBy, CorrelationBy, Histogram)
├── parallel.go Parallel operations (PipeMapParallel, PipeFilterParallel, PipeMapParallelStream...)
├── batch.go Batching with size and timeout, context-aware cancellation
├── ratelimit.go Token bucket rate limiter (RateLimit, RateLimitCtx)
├── csv.go CSV sources and sinks (FromCSV, FromCSVFunc, ToCSV, ToCSVStruct, CSVConfig)
├── row.go Row type with pandas-style access (FromCSVRows, Row.Get, Row.GetFloat...)
├── sink.go Output adapters (ToChannel, ToWriter, ToWriterString)
├── hooks.go Hook types, ErrorAction, error handling dispatch
├── hookfn.go Ready-made hooks (RetryHandler, CountElements, LogErrorsTo...)
├── slice.go Standalone slice functions (Map, Filter, Reduce, Unique...)
└── examples/
├── swapi/ Streaming ETL from Star Wars API
├── quotes/ Market tick stream → candles → technical indicators
├── realestate/ Property labeling and market analytics
├── hooks/ Hooks system showcase (all hook types + timeout)
└── csv/ CSV pipeline — functional vs struct tags
MIT License — see LICENSE.