Skip to content

Commit a69cdaa

Browse files
bakayoloampcode-com
andcommitted
fix: eliminate RetrieveFindings activity to avoid Temporal 4MB payload limit
The RetrieveFindings activity returned all findings as a single Temporal activity result, which exceeded the 4MB gRPC message limit for large inventories (12K+ Aurora clusters = ~10MB serialized). Instead of passing findings through Temporal payloads, CreateSnapshot now reads directly from the in-memory store. Findings stay within the worker process and never transit Temporal's gRPC layer. - Remove RetrieveFindings activity and its registration - CreateSnapshot accepts ResourceTypes and reads from store itself - Orchestrator passes successful resource types instead of findings - All detection workflows still run in parallel via child workflows Amp-Thread-ID: https://ampcode.com/threads/T-019d92b6-b80d-731a-8a83-64e6442ae52c Co-authored-by: Amp <amp@ampcode.com>
1 parent 550f2fb commit a69cdaa

File tree

3 files changed

+27
-63
lines changed

3 files changed

+27
-63
lines changed

cmd/server/main.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,9 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
329329
// Orchestrator workflow activities
330330
if snapshotStore != nil {
331331
orchestratorActivities := orchestrator.NewActivities(st, snapshotStore)
332-
w.RegisterActivityWithOptions(orchestratorActivities.RetrieveFindings, activity.RegisterOptions{Name: orchestrator.RetrieveFindingsActivityName})
333332
w.RegisterActivityWithOptions(orchestratorActivities.CreateSnapshot, activity.RegisterOptions{Name: orchestrator.CreateSnapshotActivityName})
334333
fmt.Println("✓ Orchestrator activities registered (with S3)")
335334
} else {
336-
// Without S3, we can still retrieve findings but can't create snapshots
337-
orchestratorActivities := orchestrator.NewActivities(st, nil)
338-
w.RegisterActivityWithOptions(orchestratorActivities.RetrieveFindings, activity.RegisterOptions{Name: orchestrator.RetrieveFindingsActivityName})
339335
fmt.Println("⚠️ Orchestrator snapshot activity not registered (no S3 store)")
340336
}
341337

pkg/workflow/orchestrator/activities.go

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,16 @@ import (
1313

1414
// Activity names
1515
const (
16-
RetrieveFindingsActivityName = "version-guard.RetrieveFindings"
17-
CreateSnapshotActivityName = "version-guard.CreateSnapshot"
16+
CreateSnapshotActivityName = "version-guard.CreateSnapshot"
1817
)
1918

2019
// Activity input/output types
2120

22-
type RetrieveFindingsInput struct {
23-
ResourceType types.ResourceType
24-
}
25-
2621
type CreateSnapshotInput struct {
27-
ScanID string
28-
FindingsByType map[types.ResourceType][]*types.Finding
29-
ScanStartTime time.Time
30-
ScanEndTime time.Time
22+
ScanID string
23+
ResourceTypes []types.ResourceType
24+
ScanStartTime time.Time
25+
ScanEndTime time.Time
3126
}
3227

3328
type SnapshotResult struct {
@@ -57,34 +52,27 @@ func NewActivities(
5752
}
5853
}
5954

60-
// RetrieveFindings retrieves all findings for a given resource type from the store
61-
func (a *Activities) RetrieveFindings(ctx context.Context, input RetrieveFindingsInput) ([]*types.Finding, error) {
62-
logger := activity.GetLogger(ctx)
63-
logger.Info("Retrieving findings from store", "resourceType", input.ResourceType)
64-
65-
filters := store.FindingFilters{
66-
ResourceType: &input.ResourceType,
67-
}
68-
69-
findings, err := a.Store.ListFindings(ctx, filters)
70-
if err != nil {
71-
return nil, err
72-
}
73-
74-
logger.Info("Findings retrieved", "count", len(findings))
75-
return findings, nil
76-
}
77-
78-
// CreateSnapshot creates a snapshot from findings and persists it to S3
55+
// CreateSnapshot reads findings directly from the store and persists a snapshot to S3.
56+
// This avoids passing large finding payloads through Temporal activity results,
57+
// which would exceed the 4MB gRPC message limit for large inventories (12K+ resources).
7958
func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInput) (*SnapshotResult, error) {
8059
logger := activity.GetLogger(ctx)
81-
logger.Info("Creating snapshot", "scanID", input.ScanID, "resourceTypeCount", len(input.FindingsByType))
60+
logger.Info("Creating snapshot", "scanID", input.ScanID, "resourceTypeCount", len(input.ResourceTypes))
8261

83-
// Build snapshot
62+
// Build snapshot by reading findings directly from the store per resource type
8463
builder := snapshot.NewBuilder()
8564
builder.WithScanTiming(input.ScanStartTime, input.ScanEndTime)
8665

87-
for resourceType, findings := range input.FindingsByType {
66+
for _, resourceType := range input.ResourceTypes {
67+
rt := resourceType
68+
findings, err := a.Store.ListFindings(ctx, store.FindingFilters{
69+
ResourceType: &rt,
70+
})
71+
if err != nil {
72+
logger.Warn("Failed to retrieve findings for snapshot", "resourceType", resourceType, "error", err)
73+
continue
74+
}
75+
logger.Info("Retrieved findings for snapshot", "resourceType", resourceType, "count", len(findings))
8876
builder.AddFindings(resourceType, findings)
8977
}
9078

@@ -113,9 +101,6 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp
113101
func RegisterActivities(worker interface {
114102
RegisterActivityWithOptions(interface{}, activity.RegisterOptions)
115103
}, activities *Activities) {
116-
worker.RegisterActivityWithOptions(activities.RetrieveFindings, activity.RegisterOptions{
117-
Name: RetrieveFindingsActivityName,
118-
})
119104
worker.RegisterActivityWithOptions(activities.CreateSnapshot, activity.RegisterOptions{
120105
Name: CreateSnapshotActivityName,
121106
})

pkg/workflow/orchestrator/workflow.go

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO
108108

109109
// Wait for all child workflows to complete and collect results
110110
resourceTypeResults := make(map[types.ResourceType]*ResourceTypeResult)
111-
allFindings := make(map[types.ResourceType][]*types.Finding)
111+
var successfulTypes []types.ResourceType
112112

113113
for resourceType, future := range futures {
114114
var output detectionWorkflow.WorkflowOutput
@@ -136,27 +136,10 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO
136136
}
137137

138138
resourceTypeResults[resourceType] = result
139-
140-
// Retrieve findings from store for snapshot creation
141-
var retrievedFindings []*types.Finding
142-
err = workflow.ExecuteActivity(
143-
workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
144-
StartToCloseTimeout: 2 * time.Minute,
145-
RetryPolicy: retryPolicy,
146-
}),
147-
RetrieveFindingsActivityName,
148-
RetrieveFindingsInput{ResourceType: resourceType},
149-
).Get(ctx, &retrievedFindings)
150-
151-
if err != nil {
152-
logger.Warn("Failed to retrieve findings for snapshot", "resourceType", resourceType, "error", err)
153-
continue
154-
}
155-
156-
allFindings[resourceType] = retrievedFindings
139+
successfulTypes = append(successfulTypes, resourceType)
157140
}
158141

159-
logger.Info("Stage 1: Detect - All detection workflows completed", "successCount", len(allFindings))
142+
logger.Info("Stage 1: Detect - All detection workflows completed", "successCount", len(successfulTypes))
160143

161144
// Stage 2: STORE - Create and persist snapshot to S3
162145
logger.Info("Stage 2: Store - Creating snapshot")
@@ -169,10 +152,10 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO
169152
}),
170153
CreateSnapshotActivityName,
171154
CreateSnapshotInput{
172-
ScanID: input.ScanID,
173-
FindingsByType: allFindings,
174-
ScanStartTime: startTime,
175-
ScanEndTime: workflow.Now(ctx),
155+
ScanID: input.ScanID,
156+
ResourceTypes: successfulTypes,
157+
ScanStartTime: startTime,
158+
ScanEndTime: workflow.Now(ctx),
176159
},
177160
).Get(ctx, &snapshotResult)
178161

0 commit comments

Comments
 (0)