Skip to content

Commit 6c99a84

Browse files
committed
[PECOBLR-1384] Fix telemetry gaps: CloudFetch S3 timing, chunk totals, flush ordering
- Thread per-S3-file download timing from cloudFetchDownloadTask through cloudIPCStreamIterator via onFileDownloaded callback; aggregate into initial/slowest/sum chunk timing tags matching JDBC per-chunk HTTP GET model - Zero FetchResults RPC latency for CloudFetch pages (URLs-only response) so Thrift round-trip is not misreported as data download time - Add statementID param to RecordOperation so CLOSE_STATEMENT entries carry sql_statement_id in telemetry tables - Fix EXECUTE_STATEMENT/CLOSE_STATEMENT loss on shutdown: add inFlight WaitGroup to metricsAggregator so close() waits for in-flight worker exports before calling cancel(), preventing metrics picked up by workers from being dropped - Derive total_chunks_present from final chunkCount in closeCallback for paginated CloudFetch and inline results where server does not report total - Remove omitempty from OperationLatencyMs so CLOSE_STATEMENT reports 0ms (instant server-side close) rather than null Co-authored-by: samikshya-chand_data
1 parent 6aee916 commit 6c99a84

13 files changed

Lines changed: 201 additions & 121 deletions

connection.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (c *conn) Close() error {
6060

6161
// Record DELETE_SESSION regardless of error (matches JDBC), then flush and release
6262
if c.telemetry != nil {
63-
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
63+
c.telemetry.RecordOperation(ctx, c.id, "", telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
6464
_ = c.telemetry.Close(ctx)
6565
telemetry.ReleaseForConnection(c.cfg.Host)
6666
}
@@ -164,7 +164,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
164164
OperationHandle: exStmtResp.OperationHandle,
165165
})
166166
if c.telemetry != nil {
167-
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
167+
c.telemetry.RecordOperation(ctx, c.id, statementID, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
168168
}
169169
if err1 != nil {
170170
log.Err(err1).Msg("databricks: failed to close operation after executing statement")
@@ -273,30 +273,59 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
273273
}
274274
}
275275

276+
// cloudFetchCallback is invoked per S3 file download for CloudFetch result sets.
277+
// It aggregates individual file download times into the same initial/slowest/sum vars
278+
// used for inline chunk timing, matching JDBC's per-chunk HTTP GET timing model.
279+
// For inline (non-CloudFetch) result sets this is never called.
280+
var cloudFetchCallback func(downloadMs int64)
281+
if c.telemetry != nil {
282+
cloudFetchCallback = func(downloadMs int64) {
283+
if downloadMs <= 0 {
284+
return
285+
}
286+
if !chunkTimingInitialSet {
287+
chunkTimingInitialMs = downloadMs
288+
chunkTimingInitialSet = true
289+
}
290+
if downloadMs > chunkTimingSlowestMs {
291+
chunkTimingSlowestMs = downloadMs
292+
}
293+
chunkTimingSumMs += downloadMs
294+
c.telemetry.AddTag(ctx, "chunk_initial_latency_ms", chunkTimingInitialMs)
295+
c.telemetry.AddTag(ctx, "chunk_slowest_latency_ms", chunkTimingSlowestMs)
296+
c.telemetry.AddTag(ctx, "chunk_sum_latency_ms", chunkTimingSumMs)
297+
}
298+
}
299+
276300
// closeCallback is invoked from rows.Close() after all rows have been consumed.
277301
// At that point chunk timing is fully accumulated in ctx tags, so we finalize
278302
// EXECUTE_STATEMENT here rather than at QueryContext return time.
279-
var closeCallback func(latencyMs int64, err error)
303+
var closeCallback func(latencyMs int64, chunkCount int, err error)
280304
if c.telemetry != nil && statementID != "" {
281305
interceptor := c.telemetry
282306
connID := c.id
283307
stmtID := statementID
284-
closeCallback = func(latencyMs int64, closeErr error) {
308+
closeCallback = func(latencyMs int64, chunkCount int, closeErr error) {
309+
// If the server never reported total chunks (paginated CloudFetch / inline),
310+
// derive it from the final chunk count now that all iteration is complete.
311+
if chunkTotalPresent == 0 && chunkCount > 0 {
312+
interceptor.AddTag(ctx, "chunk_total_present", chunkCount)
313+
}
285314
// Emit EXECUTE_STATEMENT with complete chunk data now that iteration is done.
286315
interceptor.AfterExecute(ctx, nil)
287316
interceptor.CompleteStatement(ctx, stmtID, false)
288317
// Emit CLOSE_STATEMENT as a separate operation event.
289-
interceptor.RecordOperation(ctx, connID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
318+
interceptor.RecordOperation(ctx, connID, stmtID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
290319
}
291320
} else if c.telemetry != nil {
292321
interceptor := c.telemetry
293322
connID := c.id
294-
closeCallback = func(latencyMs int64, closeErr error) {
295-
interceptor.RecordOperation(ctx, connID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
323+
closeCallback = func(latencyMs int64, _ int, closeErr error) {
324+
interceptor.RecordOperation(ctx, connID, "", telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
296325
}
297326
}
298327

299-
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, closeCallback)
328+
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, closeCallback, cloudFetchCallback)
300329
return rows, err
301330

302331
}
@@ -733,7 +762,7 @@ func (c *conn) execStagingOperation(
733762
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
734763
}
735764
}
736-
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, nil)
765+
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, nil, nil)
737766
if err != nil {
738767
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
739768
}

connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
9494
)
9595
if conn.telemetry != nil {
9696
log.Debug().Msg("telemetry initialized for connection")
97-
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs, nil)
97+
conn.telemetry.RecordOperation(ctx, conn.id, "", telemetry.OperationTypeCreateSession, sessionLatencyMs, nil)
9898
}
9999

100100
log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)

internal/rows/arrowbased/arrowIPCStreamIterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (ri *arrowIPCStreamIterator) fetchNextData() error {
138138
func (ri *arrowIPCStreamIterator) newIPCStreamIterator(fr *cli_service.TFetchResultsResp) (IPCStreamIterator, error) {
139139
rowSet := fr.Results
140140
if len(rowSet.ResultLinks) > 0 {
141-
return NewCloudIPCStreamIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
141+
return NewCloudIPCStreamIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg, nil)
142142
} else {
143143
return NewLocalIPCStreamIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
144144
}

internal/rows/arrowbased/arrowRecordIterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (ri *arrowRecordIterator) getBatchIterator() error {
169169
func (ri *arrowRecordIterator) newBatchIterator(fr *cli_service.TFetchResultsResp) (BatchIterator, error) {
170170
rowSet := fr.Results
171171
if len(rowSet.ResultLinks) > 0 {
172-
return NewCloudBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
172+
return NewCloudBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg, nil)
173173
} else {
174174
return NewLocalBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
175175
}

internal/rows/arrowbased/arrowRows.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ type arrowRowScanner struct {
8080
// Make sure arrowRowScanner fulfills the RowScanner interface
8181
var _ rowscanner.RowScanner = (*arrowRowScanner)(nil)
8282

83-
// NewArrowRowScanner returns an instance of RowScanner which handles arrow format results
84-
func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp, rowSet *cli_service.TRowSet, cfg *config.Config, logger *dbsqllog.DBSQLLogger, ctx context.Context) (rowscanner.RowScanner, dbsqlerr.DBError) {
83+
// NewArrowRowScanner returns an instance of RowScanner which handles arrow format results.
84+
// onCloudFetchDownload is an optional callback invoked for each CloudFetch S3 file download
85+
// with the download duration in milliseconds. Pass nil for non-telemetry paths.
86+
func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp, rowSet *cli_service.TRowSet, cfg *config.Config, logger *dbsqllog.DBSQLLogger, ctx context.Context, onCloudFetchDownload func(downloadMs int64)) (rowscanner.RowScanner, dbsqlerr.DBError) {
8587

8688
// we take a passed in logger, rather than just using the global from dbsqllog, so that the containing rows
8789
// instance can pass in a logger with context such as correlation ID and operation ID
@@ -119,7 +121,7 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp
119121
for _, resultLink := range rowSet.ResultLinks {
120122
logger.Debug().Msgf("- start row offset: %d, row count: %d", resultLink.StartRowOffset, resultLink.RowCount)
121123
}
122-
bi, err2 = NewCloudBatchIterator(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
124+
bi, err2 = NewCloudBatchIterator(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg, onCloudFetchDownload)
123125
} else {
124126
bi, err2 = NewLocalBatchIterator(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
125127
}

0 commit comments

Comments
 (0)