Skip to content

Commit 242fad2

Browse files
committed
Delay EXECUTE_STATEMENT metric emission to rows.Close()
Previously AfterExecute/CompleteStatement fired when QueryContext() returned — before the user ever called rows.Next(). This meant chunk_count was always 1 and all per-chunk timing fields were null, because fetchResultPage() calls happen during row iteration. Fix: - Add FinalizeLatency() to Interceptor: captures elapsed time at QueryContext return to preserve execute-only latency in mc. - AfterExecute() uses the pre-captured latency if available, so the metric reports server-exec+poll time regardless of when it fires. - Move AfterExecute/CompleteStatement from a defer in QueryContext to closeCallback, which rows.Close() invokes after all rows are read. At that point chunk_count and all timing tags are fully accumulated. - Error path: still emits EXECUTE_STATEMENT immediately on runQuery failure (no rows means no chunks to wait for). Co-authored-by: Isaac
1 parent 33fcf74 commit 242fad2

2 files changed

Lines changed: 64 additions & 12 deletions

File tree

connection.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,25 +207,34 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
207207
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
208208
defer log.Duration(msg, start)
209209

210-
// Telemetry: track statement execution
210+
// Telemetry: set up metric context for the statement.
211+
// BeforeExecuteWithTime anchors startTime to before runQuery() ran.
211212
var statementID string
212213
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
213214
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
214-
// Use BeforeExecuteWithTime to set the correct start time (before execution)
215215
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
216216
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)
217-
defer func() {
218-
c.telemetry.AfterExecute(ctx, err)
219-
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
220-
}()
221217
}
222218

223219
if err != nil {
220+
// Error path: finalize and emit the EXECUTE_STATEMENT metric immediately —
221+
// there are no rows to iterate so the metric is complete right now.
222+
if c.telemetry != nil && statementID != "" {
223+
c.telemetry.AfterExecute(ctx, err)
224+
c.telemetry.CompleteStatement(ctx, statementID, true)
225+
}
224226
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
225227
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
226228
}
227229

228-
// Per-chunk timing state captured in the closure below.
230+
// Success path: freeze execute latency NOW (before row iteration inflates time.Since).
231+
// AfterExecute/CompleteStatement are called from closeCallback after all chunks
232+
// are fetched, so the final metric carries complete chunk timing data.
233+
if c.telemetry != nil && statementID != "" {
234+
c.telemetry.FinalizeLatency(ctx)
235+
}
236+
237+
// Per-chunk timing state accumulated across all fetchResultPage calls.
229238
var (
230239
chunkTimingInitialMs int64
231240
chunkTimingSlowestMs int64
@@ -234,7 +243,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
234243
chunkTotalPresent int32
235244
)
236245

237-
// Telemetry callback for tracking row fetching metrics
246+
// Telemetry callback invoked after each result page is fetched.
238247
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32) {
239248
if c.telemetry == nil {
240249
return
@@ -257,16 +266,29 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
257266
c.telemetry.AddTag(ctx, "chunk_sum_latency_ms", chunkTimingSumMs)
258267
}
259268

260-
// Record total chunks present from first server report.
269+
// Record server-reported total chunks from first non-zero report.
261270
if totalChunksPresent > 0 && chunkTotalPresent == 0 {
262271
chunkTotalPresent = totalChunksPresent
263272
c.telemetry.AddTag(ctx, "chunk_total_present", int(chunkTotalPresent))
264273
}
265274
}
266275

267-
// Telemetry callback for CLOSE_STATEMENT — fired from rows.Close()
276+
// closeCallback is invoked from rows.Close() after all rows have been consumed.
277+
// At that point chunk timing is fully accumulated in ctx tags, so we finalize
278+
// EXECUTE_STATEMENT here rather than at QueryContext return time.
268279
var closeCallback func(latencyMs int64, err error)
269-
if c.telemetry != nil {
280+
if c.telemetry != nil && statementID != "" {
281+
interceptor := c.telemetry
282+
connID := c.id
283+
stmtID := statementID
284+
closeCallback = func(latencyMs int64, closeErr error) {
285+
// Emit EXECUTE_STATEMENT with complete chunk data now that iteration is done.
286+
interceptor.AfterExecute(ctx, nil)
287+
interceptor.CompleteStatement(ctx, stmtID, false)
288+
// Emit CLOSE_STATEMENT as a separate operation event.
289+
interceptor.RecordOperation(ctx, connID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
290+
}
291+
} else if c.telemetry != nil {
270292
interceptor := c.telemetry
271293
connID := c.id
272294
closeCallback = func(latencyMs int64, closeErr error) {

telemetry/interceptor.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ type metricContext struct {
2020
statementID string
2121
startTime time.Time
2222
tags map[string]interface{}
23+
24+
// capturedLatencyMs is set by FinalizeLatency() to freeze the execute-phase
25+
// latency before row iteration begins. AfterExecute uses this value instead
26+
// of re-measuring from startTime (which would include row-scan time).
27+
capturedLatencyMs int64
28+
latencyCaptured bool
2329
}
2430

2531
type contextKey int
@@ -83,6 +89,22 @@ func (i *Interceptor) BeforeExecuteWithTime(ctx context.Context, sessionID strin
8389
return withMetricContext(ctx, mc)
8490
}
8591

92+
// FinalizeLatency freezes the elapsed time as the statement's execution latency.
93+
// Call this when the execute phase is complete (i.e. when QueryContext returns) so
94+
// that AfterExecute, even if called later from rows.Close(), still reports
95+
// execute-only latency rather than total latency that would include row iteration.
96+
// Exported for use by the driver package.
97+
func (i *Interceptor) FinalizeLatency(ctx context.Context) {
98+
if !i.enabled {
99+
return
100+
}
101+
mc := getMetricContext(ctx)
102+
if mc != nil && !mc.latencyCaptured {
103+
mc.capturedLatencyMs = time.Since(mc.startTime).Milliseconds()
104+
mc.latencyCaptured = true
105+
}
106+
}
107+
86108
// AfterExecute is called after statement execution.
87109
// Records the metric with timing and error information.
88110
// Exported for use by the driver package.
@@ -103,12 +125,20 @@ func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
103125
}
104126
}()
105127

128+
// Use pre-captured latency if available (set by FinalizeLatency), otherwise
129+
// fall back to measuring from startTime (covers the error-path where
130+
// FinalizeLatency was never called).
131+
latencyMs := time.Since(mc.startTime).Milliseconds()
132+
if mc.latencyCaptured {
133+
latencyMs = mc.capturedLatencyMs
134+
}
135+
106136
metric := &telemetryMetric{
107137
metricType: "statement",
108138
timestamp: mc.startTime,
109139
sessionID: mc.sessionID,
110140
statementID: mc.statementID,
111-
latencyMs: time.Since(mc.startTime).Milliseconds(),
141+
latencyMs: latencyMs,
112142
tags: mc.tags,
113143
}
114144

0 commit comments

Comments
 (0)