Skip to content

Commit 6aee916

Browse files
committed
Fix metric loss: drain exportQueue before cancelling workers in close()
flushSync() only flushes agg.batch, but flushUnlocked() (triggered by terminal ops like CLOSE_STATEMENT) moves batch items into agg.exportQueue for async worker processing. When agg.close() called cancel() immediately after flushSync(), those queue items were silently dropped because workers exited before picking them up — causing EXECUTE_STATEMENT and CLOSE_STATEMENT metrics to never reach the telemetry endpoint. Fix: drain the exportQueue synchronously between flushSync and cancel, processing any pending export jobs directly before workers stop. Co-authored-by: Isaac
1 parent 242fad2 commit 6aee916

1 file changed

Lines changed: 24 additions & 7 deletions

File tree

telemetry/aggregator.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,32 @@ func (agg *metricsAggregator) flushUnlocked(ctx context.Context) {
277277
// Safe to call multiple times — subsequent calls are no-ops (closeOnce).
278278
//
279279
// Shutdown order matters:
280-
// 1. Stop periodic flush (close stopCh) so no new async exports are queued.
281-
// 2. Synchronously flush the current batch directly (flushSync bypasses the
282-
// worker queue, so it works even after workers are stopped).
283-
// 3. Cancel the aggregator context to stop the 10 export worker goroutines.
280+
// 1. Stop periodic flush (close stopCh) so no new async flushes are queued.
281+
// 2. Flush agg.batch synchronously (direct export, bypasses worker queue).
282+
// 3. Drain agg.exportQueue — process any jobs already submitted by prior
283+
// flushUnlocked calls (e.g. CLOSE_STATEMENT from rows.Close()) before
284+
// workers are cancelled. Without this step those jobs are silently
285+
// dropped when cancel() fires.
286+
// 4. Cancel the aggregator context to stop the 10 export worker goroutines.
284287
func (agg *metricsAggregator) close(ctx context.Context) error {
285288
agg.closeOnce.Do(func() {
286-
close(agg.stopCh) // Stop periodic flush loop
287-
agg.flushSync(ctx) // Final flush — direct export, no workers needed
288-
agg.cancel() // Stop export workers after final flush
289+
close(agg.stopCh) // 1. Stop periodic flush loop
290+
agg.flushSync(ctx) // 2. Flush agg.batch directly
291+
292+
// 3. Drain any jobs already sitting in the exportQueue.
293+
// flushUnlocked cleared agg.batch into the queue; flushSync above sees
294+
// an empty batch, so queue items would otherwise be lost when workers
295+
// are cancelled in step 4.
296+
for {
297+
select {
298+
case job := <-agg.exportQueue:
299+
agg.exporter.export(job.ctx, job.metrics)
300+
default:
301+
goto drained
302+
}
303+
}
304+
drained:
305+
agg.cancel() // 4. Stop export workers (queue is now empty)
289306
})
290307
return nil
291308
}

0 commit comments

Comments
 (0)