Skip to content

Commit 473cdb3

Browse files
feat: add stdin/pipe log source for command composability (#10)
Implement StdinSource conforming to the Source interface, enabling piped input from kubectl, docker, cat, and other commands. Features: - Pipe vs terminal detection (IsPipe) - Buffered channel with configurable backpressure (block or drop-oldest) - Support for long log lines (up to 1 MB) - Graceful shutdown via context cancellation - Comprehensive tests with mock stdin readers Fixes #8
1 parent d19bd13 commit 473cdb3

2 files changed

Lines changed: 314 additions & 0 deletions

File tree

internal/source/stdin.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package source
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"io"
8+
"os"
9+
"sync"
10+
)
11+
12+
const (
13+
// DefaultBufferSize is the default capacity for the lines channel.
14+
DefaultBufferSize = 1000
15+
16+
// DropOldest discards the oldest unread line when the buffer is full.
17+
DropOldest BackpressureStrategy = iota
18+
// Block waits until a reader consumes a line before accepting more.
19+
Block
20+
)
21+
22+
// BackpressureStrategy controls behaviour when the lines channel is full.
23+
type BackpressureStrategy int
24+
25+
// StdinOption configures a StdinSource.
26+
type StdinOption func(*StdinSource)
27+
28+
// WithBufferSize sets the capacity of the lines channel.
29+
func WithBufferSize(n int) StdinOption {
30+
return func(s *StdinSource) { s.bufSize = n }
31+
}
32+
33+
// WithBackpressure sets the backpressure strategy.
34+
func WithBackpressure(bp BackpressureStrategy) StdinOption {
35+
return func(s *StdinSource) { s.backpressure = bp }
36+
}
37+
38+
// WithReader overrides the default stdin reader (useful for testing).
39+
func WithReader(r io.Reader) StdinOption {
40+
return func(s *StdinSource) { s.reader = r }
41+
}
42+
43+
// StdinSource reads log lines from standard input. It is designed to work
44+
// with piped input such as:
45+
//
46+
// kubectl logs -f pod | logpilot
47+
// cat app.log | logpilot
48+
// docker logs -f container | logpilot
49+
type StdinSource struct {
50+
reader io.Reader
51+
lines chan LogEntry
52+
errs chan error
53+
bufSize int
54+
backpressure BackpressureStrategy
55+
cancel context.CancelFunc
56+
once sync.Once
57+
done chan struct{}
58+
}
59+
60+
// NewStdinSource creates a new StdinSource with the given options.
61+
func NewStdinSource(opts ...StdinOption) *StdinSource {
62+
s := &StdinSource{
63+
reader: os.Stdin,
64+
bufSize: DefaultBufferSize,
65+
backpressure: Block,
66+
done: make(chan struct{}),
67+
}
68+
for _, o := range opts {
69+
o(s)
70+
}
71+
s.lines = make(chan LogEntry, s.bufSize)
72+
s.errs = make(chan error, 1)
73+
return s
74+
}
75+
76+
// IsPipe reports whether stdin appears to be a pipe (not a terminal).
77+
func IsPipe() bool {
78+
fi, err := os.Stdin.Stat()
79+
if err != nil {
80+
return false
81+
}
82+
return (fi.Mode() & os.ModeCharDevice) == 0
83+
}
84+
85+
// Lines returns the channel of log entries.
86+
func (s *StdinSource) Lines() <-chan LogEntry { return s.lines }
87+
88+
// Errors returns the channel of errors.
89+
func (s *StdinSource) Errors() <-chan error { return s.errs }
90+
91+
// Start reads lines from stdin until ctx is cancelled or EOF is reached.
92+
func (s *StdinSource) Start(ctx context.Context) error {
93+
ctx, s.cancel = context.WithCancel(ctx)
94+
defer close(s.lines)
95+
defer close(s.errs)
96+
defer close(s.done)
97+
98+
scanner := bufio.NewScanner(s.reader)
99+
// Support very long log lines (up to 1 MB).
100+
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
101+
102+
for scanner.Scan() {
103+
entry := LogEntry{
104+
Line: scanner.Text(),
105+
Source: "stdin",
106+
}
107+
if !s.emit(ctx, entry) {
108+
return ctx.Err()
109+
}
110+
}
111+
if err := scanner.Err(); err != nil {
112+
select {
113+
case s.errs <- fmt.Errorf("stdin read error: %w", err):
114+
default:
115+
}
116+
return err
117+
}
118+
return nil
119+
}
120+
121+
// emit sends an entry to the lines channel, respecting backpressure strategy.
122+
func (s *StdinSource) emit(ctx context.Context, entry LogEntry) bool {
123+
switch s.backpressure {
124+
case DropOldest:
125+
select {
126+
case s.lines <- entry:
127+
default:
128+
// Channel full — drop oldest.
129+
select {
130+
case <-s.lines:
131+
default:
132+
}
133+
select {
134+
case s.lines <- entry:
135+
case <-ctx.Done():
136+
return false
137+
}
138+
}
139+
default: // Block
140+
select {
141+
case s.lines <- entry:
142+
case <-ctx.Done():
143+
return false
144+
}
145+
}
146+
return true
147+
}
148+
149+
// Stop cancels reading and waits for the reader goroutine to finish.
150+
func (s *StdinSource) Stop() error {
151+
s.once.Do(func() {
152+
if s.cancel != nil {
153+
s.cancel()
154+
}
155+
})
156+
<-s.done
157+
return nil
158+
}

internal/source/stdin_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package source
2+
3+
import (
4+
"context"
5+
"io"
6+
"strings"
7+
"testing"
8+
"time"
9+
)
10+
11+
func stdinCollectLines(t *testing.T, s *StdinSource, timeout time.Duration) []LogEntry {
12+
t.Helper()
13+
var entries []LogEntry
14+
deadline := time.After(timeout)
15+
for {
16+
select {
17+
case entry, ok := <-s.Lines():
18+
if !ok {
19+
return entries
20+
}
21+
entries = append(entries, entry)
22+
case <-deadline:
23+
t.Fatal("timed out waiting for lines")
24+
return nil
25+
}
26+
}
27+
}
28+
29+
func TestStdinSource_BasicRead(t *testing.T) {
30+
input := "line one\nline two\nline three\n"
31+
src := NewStdinSource(WithReader(strings.NewReader(input)))
32+
33+
go src.Start(context.Background())
34+
entries := stdinCollectLines(t, src, 2*time.Second)
35+
36+
want := []string{"line one", "line two", "line three"}
37+
if len(entries) != len(want) {
38+
t.Fatalf("got %d lines, want %d", len(entries), len(want))
39+
}
40+
for i, e := range entries {
41+
if e.Line != want[i] {
42+
t.Errorf("line %d: got %q, want %q", i, e.Line, want[i])
43+
}
44+
if e.Source != "stdin" {
45+
t.Errorf("line %d: source = %q, want \"stdin\"", i, e.Source)
46+
}
47+
}
48+
}
49+
50+
func TestStdinSource_EmptyInput(t *testing.T) {
51+
src := NewStdinSource(WithReader(strings.NewReader("")))
52+
53+
go src.Start(context.Background())
54+
entries := stdinCollectLines(t, src, 2*time.Second)
55+
56+
if len(entries) != 0 {
57+
t.Fatalf("expected 0 lines, got %d", len(entries))
58+
}
59+
}
60+
61+
func TestStdinSource_ContextCancellation(t *testing.T) {
62+
pr, pw := io.Pipe()
63+
64+
src := NewStdinSource(WithReader(pr))
65+
ctx, cancel := context.WithCancel(context.Background())
66+
67+
done := make(chan struct{})
68+
go func() {
69+
src.Start(ctx)
70+
close(done)
71+
}()
72+
73+
pw.Write([]byte("hello\n"))
74+
<-src.Lines()
75+
cancel()
76+
pw.Close() // unblock scanner
77+
78+
select {
79+
case <-done:
80+
case <-time.After(2 * time.Second):
81+
t.Fatal("Start did not return after context cancellation")
82+
}
83+
}
84+
85+
func TestStdinSource_Stop(t *testing.T) {
86+
pr, pw := io.Pipe()
87+
88+
src := NewStdinSource(WithReader(pr))
89+
90+
go src.Start(context.Background())
91+
92+
pw.Write([]byte("line\n"))
93+
<-src.Lines()
94+
95+
go pw.Close()
96+
src.Stop()
97+
98+
_, ok := <-src.Lines()
99+
if ok {
100+
t.Fatal("expected lines channel to be closed")
101+
}
102+
}
103+
104+
func TestStdinSource_DropOldest(t *testing.T) {
105+
input := "a\nb\nc\nd\n"
106+
src := NewStdinSource(
107+
WithReader(strings.NewReader(input)),
108+
WithBufferSize(2),
109+
WithBackpressure(DropOldest),
110+
)
111+
112+
go src.Start(context.Background())
113+
entries := stdinCollectLines(t, src, 2*time.Second)
114+
115+
if len(entries) < 2 {
116+
t.Fatalf("expected at least 2 lines, got %d", len(entries))
117+
}
118+
if entries[len(entries)-1].Line != "d" {
119+
t.Errorf("last line should be 'd', got %q", entries[len(entries)-1].Line)
120+
}
121+
}
122+
123+
func TestStdinSource_LongLines(t *testing.T) {
124+
long := strings.Repeat("x", 500_000)
125+
src := NewStdinSource(WithReader(strings.NewReader(long + "\n")))
126+
127+
go src.Start(context.Background())
128+
entries := stdinCollectLines(t, src, 2*time.Second)
129+
130+
if len(entries) != 1 || len(entries[0].Line) != 500_000 {
131+
t.Fatalf("expected 1 line of 500000 chars, got %d lines", len(entries))
132+
}
133+
}
134+
135+
func TestStdinSource_Errors(t *testing.T) {
136+
src := NewStdinSource(WithReader(strings.NewReader("")))
137+
138+
go src.Start(context.Background())
139+
stdinCollectLines(t, src, 2*time.Second)
140+
141+
select {
142+
case err := <-src.Errors():
143+
if err != nil {
144+
t.Fatalf("unexpected error: %v", err)
145+
}
146+
default:
147+
}
148+
}
149+
150+
func TestStdinSource_ImplementsSource(t *testing.T) {
151+
var _ Source = (*StdinSource)(nil)
152+
}
153+
154+
func TestIsPipe(t *testing.T) {
155+
_ = IsPipe()
156+
}

0 commit comments

Comments
 (0)