Go implementation of Durable Streams.
| Suite | Version | Tests | Passed | Failed | Skipped |
|---|---|---|---|---|---|
| Server (memorystorage) | v0.1.8 | 195 | 195 | 0 | 0 |
| Server (badgerstore) | v0.1.8 | 195 | 195 | 0 | 0 |
| Client | v0.2.0 | 212 | 184 | 13 | 15 |
Client features: batching, sse, longPoll, streaming, dynamicHeaders
Skipped: 6 auto mode, 6 retryOptions, 3 strictZeroValidation/batchItems
This implementation uses the same offset format as the reference Node.js implementation:
<readSeq>_<byteOffset>
Both components are 16-digit zero-padded integers, e.g., 0000000000000000_0000000000000042.
This ensures lexicographic sortability as required by PROTOCOL.md Section 6.
| Package | Coverage |
|---|---|
| durablestream | 90.9% |
| durablestream/transport | 93.1% |
| durablestream/storage | 100.0% |
| durablestream/storage/memorystorage | 95.5% |
| durablestream/internal/protocol | 98.1% |
| durablestream/storage/badgerstore | 85.9% |
Badgerstore also has 7 fuzz tests covering stream ID validation, stream operations, sequence ordering, and concurrent operations. Run with task fuzz or task fuzz:quick.
func ExampleHandler() {
storage := memorystorage.New()
handler := durablestream.NewHandler(storage, nil)
mux := http.NewServeMux()
mux.Handle("/v1/stream/", http.StripPrefix("/v1/stream/", handler))
log.Println("Listening on :4437")
log.Fatal(http.ListenAndServe(":4437", mux))
}func ExampleClient() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
_, err := client.Create(ctx, "events", &durablestream.CreateOptions{
ContentType: "application/json",
})
if err != nil {
log.Fatal(err)
}
// Write using Writer
writer, err := client.Writer(ctx, "events")
if err != nil {
log.Fatal(err)
}
event := map[string]any{"type": "user.created", "id": 123}
if err := writer.SendJSON(event, nil); err != nil {
log.Fatal(err)
}
fmt.Println("Appended at offset:", writer.Offset())
// Read using Reader
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
result, err := reader.Read(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Got data:", len(result.Data) > 0)
fmt.Println("Next offset:", result.NextOffset)
}func ExampleReader() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
// Create a reader starting from offset 0
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
for msg, err := range reader.Messages(ctx) {
if err != nil {
log.Fatal(err)
}
// Use msg.String() for text, msg.Bytes() for raw bytes,
// or msg.Decode(&v) for JSON
fmt.Println("Received:", msg.String())
}
}MIT - see LICENSE