Skip to content

Commit 3ad77bd

Browse files
committed
Fix telemetry: correct EXECUTE_STATEMENT latency, add sql_statement_id, add CLOSE_STATEMENT
- Remove RecordOperation(EXECUTE_STATEMENT) from executeStatement(): it measured only the Thrift RPC submission time (~5s), not end-to-end query time. The correct end-to-end metric already flows through BeforeExecuteWithTime → AfterExecute → CompleteStatement. - Add AddTag(operation_type=EXECUTE_STATEMENT) after BeforeExecuteWithTime in QueryContext and ExecContext so the aggregated statement metric carries the right operation_type, sql_statement_id, and accurate latency in one event. - Add closeCallback func(latencyMs int64, err error) to rows.NewRows() and rows.Close() to emit CLOSE_STATEMENT telemetry when rows.Close() is called. Co-authored-by: Isaac
1 parent 561698e commit 3ad77bd

3 files changed

Lines changed: 30 additions & 18 deletions

File tree

connection.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
139139
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
140140
// Use BeforeExecuteWithTime to set the correct start time (before execution)
141141
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
142+
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)
142143
defer func() {
143144
finalErr := err
144145
if stagingErr != nil {
@@ -212,6 +213,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
212213
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
213214
// Use BeforeExecuteWithTime to set the correct start time (before execution)
214215
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
216+
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)
215217
defer func() {
216218
c.telemetry.AfterExecute(ctx, err)
217219
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
@@ -231,7 +233,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
231233
}
232234
}
233235

234-
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
236+
// Telemetry callback for CLOSE_STATEMENT — fired from rows.Close()
237+
var closeCallback func(latencyMs int64, err error)
238+
if c.telemetry != nil {
239+
interceptor := c.telemetry
240+
connID := c.id
241+
closeCallback = func(latencyMs int64, closeErr error) {
242+
interceptor.RecordOperation(ctx, connID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
243+
}
244+
}
245+
246+
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, closeCallback)
235247
return rows, err
236248

237249
}
@@ -396,14 +408,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
396408
}
397409
}
398410

399-
executeStart := time.Now()
400411
resp, err := c.client.ExecuteStatement(ctx, &req)
401-
// Record the Thrift call latency as a separate operation metric.
402-
// This is distinct from the statement-level metric (BeforeExecuteWithTime), which
403-
// measures end-to-end latency including polling and row fetching.
404-
if c.telemetry != nil {
405-
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeExecuteStatement, time.Since(executeStart).Milliseconds(), err)
406-
}
407412
var log *logger.DBSQLLogger
408413
log, ctx = client.LoggerAndContext(ctx, resp)
409414

@@ -675,7 +680,7 @@ func (c *conn) execStagingOperation(
675680
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
676681
}
677682
}
678-
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
683+
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate, nil)
679684
if err != nil {
680685
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
681686
}

internal/rows/rows.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type rows struct {
6060

6161
// Telemetry tracking
6262
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
63+
closeCallback func(latencyMs int64, err error)
6364
chunkCount int
6465
bytesDownloaded int64
6566
}
@@ -78,6 +79,7 @@ func NewRows(
7879
config *config.Config,
7980
directResults *cli_service.TSparkDirectResults,
8081
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
82+
closeCallback func(latencyMs int64, err error),
8183
) (driver.Rows, dbsqlerr.DBError) {
8284

8385
connId := driverctx.ConnIdFromContext(ctx)
@@ -118,6 +120,7 @@ func NewRows(
118120
logger_: logger,
119121
ctx: ctx,
120122
telemetryUpdate: telemetryUpdate,
123+
closeCallback: closeCallback,
121124
chunkCount: 0,
122125
bytesDownloaded: 0,
123126
}
@@ -210,7 +213,11 @@ func (r *rows) Close() error {
210213

211214
if r.ResultPageIterator != nil {
212215
r.logger().Debug().Msgf("databricks: closing Rows operation")
216+
closeStart := time.Now()
213217
err := r.ResultPageIterator.Close()
218+
if r.closeCallback != nil {
219+
r.closeCallback(time.Since(closeStart).Milliseconds(), err)
220+
}
214221
if err != nil {
215222
r.logger().Err(err).Msg(errRowsCloseFailed)
216223
return dbsqlerr_int.NewRequestError(r.ctx, errRowsCloseFailed, err)

internal/rows/rows_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ func TestColumnsWithDirectResults(t *testing.T) {
421421
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
422422
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
423423

424-
d, err := NewRows(ctx, nil, client, nil, nil, nil)
424+
d, err := NewRows(ctx, nil, client, nil, nil, nil, nil)
425425
assert.Nil(t, err)
426426

427427
rowSet := d.(*rows)
@@ -720,7 +720,7 @@ func TestRowsCloseOptimization(t *testing.T) {
720720
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
721721
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
722722
opHandle := &cli_service.TOperationHandle{OperationId: &cli_service.THandleIdentifier{GUID: []byte{'f', 'o'}}}
723-
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil, nil)
723+
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil, nil, nil)
724724

725725
// rowSet has no direct results calling Close should result in call to client to close operation
726726
err := rowSet.Close()
@@ -733,7 +733,7 @@ func TestRowsCloseOptimization(t *testing.T) {
733733
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
734734
}
735735
closeCount = 0
736-
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil)
736+
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
737737
err = rowSet.Close()
738738
assert.Nil(t, err, "rows.Close should not throw an error")
739739
assert.Equal(t, 1, closeCount)
@@ -746,7 +746,7 @@ func TestRowsCloseOptimization(t *testing.T) {
746746
ResultSetMetadata: &cli_service.TGetResultSetMetadataResp{Schema: &cli_service.TTableSchema{}},
747747
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
748748
}
749-
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil)
749+
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
750750
err = rowSet.Close()
751751
assert.Nil(t, err, "rows.Close should not throw an error")
752752
assert.Equal(t, 0, closeCount)
@@ -816,7 +816,7 @@ func TestGetArrowBatches(t *testing.T) {
816816

817817
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2})
818818
cfg := config.WithDefaults()
819-
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil)
819+
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
820820
assert.Nil(t, err)
821821

822822
rows2, ok := rows.(dbsqlrows.Rows)
@@ -889,7 +889,7 @@ func TestGetArrowBatches(t *testing.T) {
889889

890890
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2, fetchResp3})
891891
cfg := config.WithDefaults()
892-
rows, err := NewRows(ctx, nil, client, cfg, nil, nil)
892+
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
893893
assert.Nil(t, err)
894894

895895
rows2, ok := rows.(dbsqlrows.Rows)
@@ -950,7 +950,7 @@ func TestGetArrowBatches(t *testing.T) {
950950

951951
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
952952
cfg := config.WithDefaults()
953-
rows, err := NewRows(ctx, nil, client, cfg, nil, nil)
953+
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
954954
assert.Nil(t, err)
955955

956956
rows2, ok := rows.(dbsqlrows.Rows)
@@ -977,7 +977,7 @@ func TestGetArrowBatches(t *testing.T) {
977977

978978
client := getSimpleClient([]cli_service.TFetchResultsResp{})
979979
cfg := config.WithDefaults()
980-
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil)
980+
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
981981
assert.Nil(t, err)
982982

983983
rows2, ok := rows.(dbsqlrows.Rows)
@@ -1557,7 +1557,7 @@ func TestFetchResultPage_PropagatesGetNextPageError(t *testing.T) {
15571557

15581558
executeStatementResp := cli_service.TExecuteStatementResp{}
15591559
cfg := config.WithDefaults()
1560-
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil)
1560+
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
15611561
// Call Next and ensure it propagates the error from getNextPage
15621562
actualErr := rows.Next(nil)
15631563

0 commit comments

Comments
 (0)