Skip to content

Commit b19ed9f

Browse files
reviedclaude
andauthored
feat(schedule): add Temporal schedule for periodic scanning (#4)
## Summary - Adds Temporal Schedule API support so the OrchestratorWorkflow runs automatically on a configurable cron (default: daily at 06:00 UTC per the design doc) - Schedule manager uses a create-or-update pattern to handle server restarts gracefully (if schedule already exists, updates it if cron changed) - Disabled by default via `SCHEDULE_ENABLED=false` for backwards compatibility - Wires `SCHEDULE_*` env vars through `docker-compose.yaml` so the feature can be toggled for local end-to-end testing ## Configuration | Env Var | Default | Purpose | |---------|---------|---------| | `SCHEDULE_ENABLED` | `false` | Opt-in to scheduled scanning | | `SCHEDULE_CRON` | `0 6 * * *` | Cron expression (daily at 06:00 UTC) | | `SCHEDULE_ID` | `version-guard-scan` | Stable schedule ID for idempotent restarts | | `SCHEDULE_JITTER` | `5m` | Random jitter to prevent thundering herd | ## Changes **New:** - `pkg/schedule/schedule.go` — Schedule manager with create-or-update logic - `pkg/schedule/schedule_test.go` — 8 unit tests covering all paths **Modified:** - `cmd/server/main.go` — Config fields, schedule wiring with graceful failure - `pkg/workflow/orchestrator/workflow.go` — ScanID fallback from workflow execution ID for scheduled runs - `docker-compose.yaml` — `SCHEDULE_*` env var passthrough ## Bug fixed during review Initial implementation mutated only `CronExpressions` on the update path. Temporal parses `CronExpressions` into structured `Calendars` server-side on create, so subsequent describes return the cron inside `Calendars` with `CronExpressions` empty. Mutating only `CronExpressions` left the stale calendar in place, causing the schedule to fire on both the old and new crons after every restart with a changed cron. Fixed by replacing the entire `Spec` on update. Regression test `TestEnsureSchedule_Update_ReplacesStaleCalendars` simulates the real Temporal describe response and asserts `Calendars` is cleared. ## Test plan - [x] `go build ./...` compiles - [x] `go test ./pkg/schedule/...` — 8/8 tests pass - [x] `go test ./...` — full suite passes - [x] `golangci-lint run ./pkg/schedule/...` clean - [x] Manual end-to-end against `temporal server start-dev`: - Start 1: `SCHEDULE_ENABLED=true SCHEDULE_CRON="0 6 * * *"` → `temporal schedule describe` shows one calendar entry for 06:00 - Start 2: restart with `SCHEDULE_CRON="*/30 * * * *"` → server logs `Schedule updated`, `temporal schedule describe` shows a single calendar entry matching `*/30` (stale entry correctly cleared) - `temporal workflow list --query 'WorkflowType = "OrchestratorWorkflow"'` confirms scheduled runs fire on cron boundaries ### Reproducing locally ```bash # Terminal 1: start Temporal dev server temporal server start-dev --namespace version-guard-dev # Terminal 2: run the server with scheduling enabled SCHEDULE_ENABLED=true \ SCHEDULE_CRON="*/5 * * * *" \ SCHEDULE_ID="version-guard-local" \ SCHEDULE_JITTER=1m \ TEMPORAL_ENDPOINT=localhost:7233 \ TEMPORAL_NAMESPACE=version-guard-dev \ WIZ_CLIENT_ID_SECRET=<your-id> \ WIZ_CLIENT_SECRET_SECRET=<your-secret> \ WIZ_REPORT_IDS='{"aurora-postgresql":"<report-id>"}' \ CONFIG_PATH=config/resources.yaml \ go run ./cmd/server # Terminal 3: verify via CLI temporal schedule list --namespace version-guard-dev temporal schedule describe --schedule-id version-guard-local --namespace version-guard-dev temporal workflow list --namespace version-guard-dev \ --query 'WorkflowType = "OrchestratorWorkflow"' ``` Note: `docker-compose up` works too once `SCHEDULE_ENABLED` is exported, but the bundled Temporal image currently needs a fix on `main` before compose boots a healthy server — out of scope for this PR. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3ee601a commit b19ed9f

5 files changed

Lines changed: 470 additions & 1 deletion

File tree

cmd/server/main.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/block/Version-Guard/pkg/inventory/wiz"
2828
"github.com/block/Version-Guard/pkg/policy"
2929
"github.com/block/Version-Guard/pkg/registry"
30+
"github.com/block/Version-Guard/pkg/schedule"
3031
"github.com/block/Version-Guard/pkg/snapshot"
3132
"github.com/block/Version-Guard/pkg/store/memory"
3233
"github.com/block/Version-Guard/pkg/types"
@@ -72,6 +73,12 @@ type ServerCLI struct {
7273
TagEnvKeys string `help:"Comma-separated tag keys for environment" default:"environment,env" env:"TAG_ENV_KEYS"`
7374
TagBrandKeys string `help:"Comma-separated tag keys for brand/business unit" default:"brand" env:"TAG_BRAND_KEYS"`
7475

76+
// Schedule configuration
77+
ScheduleEnabled bool `help:"Enable scheduled scanning" default:"false" env:"SCHEDULE_ENABLED"`
78+
ScheduleCron string `help:"Cron expression for scan schedule" default:"0 6 * * *" env:"SCHEDULE_CRON"`
79+
ScheduleID string `help:"Temporal schedule ID" default:"version-guard-scan" env:"SCHEDULE_ID"`
80+
ScheduleJitter string `help:"Schedule jitter duration" default:"5m" env:"SCHEDULE_JITTER"`
81+
7582
// Resource configuration
7683
ConfigPath string `help:"Path to resources config file" default:"config/resources.yaml" env:"CONFIG_PATH"`
7784

@@ -132,6 +139,12 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
132139
fmt.Printf(" Tag Keys - App: %s\n", s.TagAppKeys)
133140
fmt.Printf(" Tag Keys - Env: %s\n", s.TagEnvKeys)
134141
fmt.Printf(" Tag Keys - Brand: %s\n", s.TagBrandKeys)
142+
if s.ScheduleEnabled {
143+
fmt.Printf(" Schedule: enabled (cron: %s, id: %s, jitter: %s)\n",
144+
s.ScheduleCron, s.ScheduleID, s.ScheduleJitter)
145+
} else {
146+
fmt.Printf(" Schedule: disabled\n")
147+
}
135148
}
136149

137150
if s.DryRun {
@@ -346,10 +359,40 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
346359
fmt.Println("⚠️ Orchestrator snapshot activity not registered (no S3 store)")
347360
}
348361

362+
// Create schedule (if enabled)
363+
if s.ScheduleEnabled {
364+
jitter, parseErr := time.ParseDuration(s.ScheduleJitter)
365+
if parseErr != nil {
366+
fmt.Printf("⚠️ Invalid schedule jitter %q, using default 5m: %v\n", s.ScheduleJitter, parseErr)
367+
jitter = 5 * time.Minute
368+
}
369+
370+
scheduleMgr := schedule.NewManager(temporalClient)
371+
schedCtx, schedCancel := context.WithTimeout(ctx, 10*time.Second)
372+
defer schedCancel()
373+
schedErr := scheduleMgr.EnsureSchedule(schedCtx, schedule.Config{
374+
Enabled: true,
375+
ScheduleID: s.ScheduleID,
376+
CronExpression: s.ScheduleCron,
377+
Jitter: jitter,
378+
TaskQueue: s.TemporalTaskQueue,
379+
})
380+
if schedErr != nil {
381+
fmt.Printf("⚠️ Failed to create/update schedule: %v\n", schedErr)
382+
fmt.Println(" Worker will continue — trigger scans manually")
383+
} else {
384+
fmt.Printf("✓ Schedule configured: %s (cron: %s, jitter: %s)\n",
385+
s.ScheduleID, s.ScheduleCron, s.ScheduleJitter)
386+
}
387+
}
388+
349389
// Start worker
350390
fmt.Printf("\n✓ Temporal worker starting on queue: %s\n", s.TemporalTaskQueue)
351391
fmt.Println("\nVersion Guard is ready!")
352-
fmt.Println("\n📖 To trigger a scan, use the Temporal UI or CLI:")
392+
if s.ScheduleEnabled {
393+
fmt.Printf(" Scans will run automatically (schedule: %s)\n", s.ScheduleCron)
394+
}
395+
fmt.Println("\n📖 To trigger a scan manually, use the Temporal UI or CLI:")
353396
fmt.Printf(" temporal workflow start --task-queue %s --type %s --input '{}'\n", s.TemporalTaskQueue, orchestrator.OrchestratorWorkflowType)
354397
fmt.Println("\n📖 To query findings via gRPC:")
355398
fmt.Printf(" grpcurl -plaintext localhost:%d list\n", s.GRPCPort)

docker-compose.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ services:
6363
WIZ_CLIENT_SECRET_SECRET: ${WIZ_CLIENT_SECRET_SECRET:-}
6464
WIZ_REPORT_IDS: ${WIZ_REPORT_IDS:-}
6565
EOL_BASE_URL: http://endoflife:8080/api
66+
SCHEDULE_ENABLED: ${SCHEDULE_ENABLED:-false}
67+
SCHEDULE_CRON: ${SCHEDULE_CRON:-0 6 * * *}
68+
SCHEDULE_ID: ${SCHEDULE_ID:-version-guard-scan}
69+
SCHEDULE_JITTER: ${SCHEDULE_JITTER:-5m}
6670
ports:
6771
- "8080:8080"
6872
- "8081:8081"

pkg/schedule/schedule.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package schedule
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/temporal"
11+
12+
"github.com/block/Version-Guard/pkg/workflow/orchestrator"
13+
)
14+
15+
// Config holds configuration for the Temporal schedule.
16+
type Config struct {
17+
ScheduleID string
18+
CronExpression string
19+
TaskQueue string
20+
Jitter time.Duration
21+
Enabled bool
22+
Paused bool
23+
}
24+
25+
// Creator abstracts the Temporal schedule client for testability.
26+
type Creator interface {
27+
Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error)
28+
GetHandle(ctx context.Context, scheduleID string) client.ScheduleHandle
29+
}
30+
31+
// Manager handles Temporal schedule lifecycle.
32+
type Manager struct {
33+
scheduleClient Creator
34+
}
35+
36+
// NewManager creates a Manager from a Temporal client.
37+
func NewManager(c client.Client) *Manager {
38+
return &Manager{scheduleClient: c.ScheduleClient()}
39+
}
40+
41+
// NewManagerWithClient creates a Manager with an explicit Creator (for testing).
42+
func NewManagerWithClient(sc Creator) *Manager {
43+
return &Manager{scheduleClient: sc}
44+
}
45+
46+
// EnsureSchedule creates the schedule if it doesn't exist, or updates it
47+
// if the cron expression has changed.
48+
func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
49+
if !cfg.Enabled {
50+
return nil
51+
}
52+
53+
opts := client.ScheduleOptions{
54+
ID: cfg.ScheduleID,
55+
Spec: client.ScheduleSpec{
56+
CronExpressions: []string{cfg.CronExpression},
57+
Jitter: cfg.Jitter,
58+
},
59+
Action: &client.ScheduleWorkflowAction{
60+
Workflow: orchestrator.OrchestratorWorkflow,
61+
Args: []interface{}{orchestrator.WorkflowInput{}},
62+
TaskQueue: cfg.TaskQueue,
63+
WorkflowExecutionTimeout: 2 * time.Hour,
64+
},
65+
Paused: cfg.Paused,
66+
}
67+
68+
_, err := m.scheduleClient.Create(ctx, opts)
69+
if err == nil {
70+
return nil
71+
}
72+
73+
// If the schedule already exists, check if we need to update it
74+
if !isScheduleAlreadyRunning(err) {
75+
return fmt.Errorf("failed to create schedule %q: %w", cfg.ScheduleID, err)
76+
}
77+
78+
handle := m.scheduleClient.GetHandle(ctx, cfg.ScheduleID)
79+
desc, err := handle.Describe(ctx)
80+
if err != nil {
81+
return fmt.Errorf("failed to describe existing schedule %q: %w", cfg.ScheduleID, err)
82+
}
83+
84+
// Check if the cron expression or jitter has changed
85+
existingSpec := desc.Schedule.Spec
86+
if existingSpec == nil {
87+
existingSpec = &client.ScheduleSpec{}
88+
}
89+
existingCrons := existingSpec.CronExpressions
90+
if len(existingCrons) == 1 && existingCrons[0] == cfg.CronExpression && existingSpec.Jitter == cfg.Jitter {
91+
fmt.Printf(" Schedule %q already configured (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
92+
return nil
93+
}
94+
95+
// Update the schedule with the new spec.
96+
// We replace the entire Spec rather than mutating fields because Temporal
97+
// parses CronExpressions into Calendars/StructuredCalendar server-side on
98+
// create. On subsequent describes, the cron lives in Calendars and
99+
// CronExpressions comes back empty — mutating CronExpressions alone would
100+
// leave stale calendars in place, causing the schedule to fire on both
101+
// the old and new cadences after every restart with a changed cron.
102+
err = handle.Update(ctx, client.ScheduleUpdateOptions{
103+
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
104+
input.Description.Schedule.Spec = &client.ScheduleSpec{
105+
CronExpressions: []string{cfg.CronExpression},
106+
Jitter: cfg.Jitter,
107+
}
108+
if action, ok := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction); ok {
109+
action.TaskQueue = cfg.TaskQueue
110+
}
111+
return &client.ScheduleUpdate{
112+
Schedule: &input.Description.Schedule,
113+
}, nil
114+
},
115+
})
116+
if err != nil {
117+
return fmt.Errorf("failed to update schedule %q: %w", cfg.ScheduleID, err)
118+
}
119+
120+
fmt.Printf(" Schedule %q updated (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
121+
return nil
122+
}
123+
124+
func isScheduleAlreadyRunning(err error) bool {
125+
return errors.Is(err, temporal.ErrScheduleAlreadyRunning)
126+
}

0 commit comments

Comments
 (0)