Skip to content

Commit 33fcf74

Browse files
committed
Add per-chunk download timing to telemetry
Time each FetchResults call in fetchResultPage() and propagate the latency through the telemetryUpdate callback. Aggregate initial, slowest, and sum chunk fetch latencies in the QueryContext closure, then map them to the ChunkDetails wire fields: - initial_chunk_latency_millis - slowest_chunk_latency_millis - sum_chunks_download_time_millis Also populate total_chunks_present from server-reported data: - DirectResults with CloseOperation set → 1 (all data inline) - CloudFetch ResultLinks → len(links) from first response The existing total_chunks_iterated (chunk_count tag) is unchanged. Co-authored-by: Isaac
1 parent 3ad77bd commit 33fcf74

4 files changed

Lines changed: 92 additions & 10 deletions

File tree

connection.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,42 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
225225
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
226226
}
227227

228+
// Per-chunk timing state captured in the closure below.
229+
var (
230+
chunkTimingInitialMs int64
231+
chunkTimingSlowestMs int64
232+
chunkTimingSumMs int64
233+
chunkTimingInitialSet bool
234+
chunkTotalPresent int32
235+
)
236+
228237
// Telemetry callback for tracking row fetching metrics
229-
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
230-
if c.telemetry != nil {
231-
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
232-
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
238+
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32) {
239+
if c.telemetry == nil {
240+
return
241+
}
242+
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
243+
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
244+
245+
// Aggregate per-chunk fetch latencies (skip direct results where latency is 0).
246+
if chunkLatencyMs > 0 {
247+
if !chunkTimingInitialSet {
248+
chunkTimingInitialMs = chunkLatencyMs
249+
chunkTimingInitialSet = true
250+
}
251+
if chunkLatencyMs > chunkTimingSlowestMs {
252+
chunkTimingSlowestMs = chunkLatencyMs
253+
}
254+
chunkTimingSumMs += chunkLatencyMs
255+
c.telemetry.AddTag(ctx, "chunk_initial_latency_ms", chunkTimingInitialMs)
256+
c.telemetry.AddTag(ctx, "chunk_slowest_latency_ms", chunkTimingSlowestMs)
257+
c.telemetry.AddTag(ctx, "chunk_sum_latency_ms", chunkTimingSumMs)
258+
}
259+
260+
// Record total chunks present from first server report.
261+
if totalChunksPresent > 0 && chunkTotalPresent == 0 {
262+
chunkTotalPresent = totalChunksPresent
263+
c.telemetry.AddTag(ctx, "chunk_total_present", int(chunkTotalPresent))
233264
}
234265
}
235266

@@ -673,8 +704,8 @@ func (c *conn) execStagingOperation(
673704
}
674705

675706
if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
676-
// Telemetry callback for staging operation row fetching
677-
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
707+
// Telemetry callback for staging operation row fetching (chunk timing not tracked for staging ops).
708+
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32) {
678709
if c.telemetry != nil {
679710
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
680711
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)

internal/rows/rows.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ type rows struct {
5959
ctx context.Context
6060

6161
// Telemetry tracking
62-
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
62+
// telemetryUpdate is called after each chunk is fetched with:
63+
// chunkCount: total chunks fetched so far (including direct results)
64+
// bytesDownloaded: cumulative bytes
65+
// chunkIndex: 0-based index of the chunk just fetched
66+
// chunkLatencyMs: fetch latency for this chunk (0 for direct results)
67+
// totalChunksPresent: server-reported total, 0 if unknown
68+
telemetryUpdate func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32)
6369
closeCallback func(latencyMs int64, err error)
6470
chunkCount int
6571
bytesDownloaded int64
@@ -78,7 +84,7 @@ func NewRows(
7884
client cli_service.TCLIService,
7985
config *config.Config,
8086
directResults *cli_service.TSparkDirectResults,
81-
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
87+
telemetryUpdate func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32),
8288
closeCallback func(latencyMs int64, err error),
8389
) (driver.Rows, dbsqlerr.DBError) {
8490

@@ -148,7 +154,18 @@ func NewRows(
148154
}
149155

150156
if r.telemetryUpdate != nil {
151-
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
157+
// Determine totalChunksPresent for direct results.
158+
// If the server already closed the operation, all data is here (totalPresent=1).
159+
// For CloudFetch direct results, use the number of result links.
160+
var totalPresent int32
161+
if directResults.CloseOperation != nil {
162+
totalPresent = int32(r.chunkCount)
163+
} else if directResults.ResultSet != nil && directResults.ResultSet.Results != nil &&
164+
directResults.ResultSet.Results.ResultLinks != nil {
165+
totalPresent = int32(len(directResults.ResultSet.Results.ResultLinks)) //nolint:gosec
166+
}
167+
// chunkIndex=0, chunkLatencyMs=0: direct results have no separate fetch latency.
168+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded, 0, 0, totalPresent)
152169
}
153170
}
154171

@@ -480,7 +497,11 @@ func (r *rows) fetchResultPage() error {
480497
r.RowScanner = nil
481498
}
482499

500+
// Record 0-based chunk index before fetching (direct results occupied index 0 if present).
501+
chunkIndex := r.chunkCount
502+
fetchStart := time.Now()
483503
fetchResult, err1 := r.ResultPageIterator.Next()
504+
chunkLatencyMs := time.Since(fetchStart).Milliseconds()
484505
if err1 != nil {
485506
return err1
486507
}
@@ -494,8 +515,14 @@ func (r *rows) fetchResultPage() error {
494515
}
495516
}
496517

518+
// For CloudFetch, the result links in the response reveal the server-reported total.
519+
var totalPresent int32
520+
if fetchResult != nil && fetchResult.Results != nil && fetchResult.Results.ResultLinks != nil {
521+
totalPresent = int32(len(fetchResult.Results.ResultLinks)) //nolint:gosec
522+
}
523+
497524
if r.telemetryUpdate != nil {
498-
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
525+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded, chunkIndex, chunkLatencyMs, totalPresent)
499526
}
500527

501528
err1 = r.makeRowScanner(fetchResult)

telemetry/request.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ func createTelemetryRequest(metrics []*telemetryMetric, driverVersion string) (*
184184
sqlOp.ChunkDetails = &ChunkDetails{
185185
TotalChunksIterated: int32(chunkCount), //nolint:gosec // chunk count is always small
186186
}
187+
if v, ok := tags["chunk_initial_latency_ms"].(int64); ok && v > 0 {
188+
sqlOp.ChunkDetails.InitialChunkLatencyMs = v
189+
}
190+
if v, ok := tags["chunk_slowest_latency_ms"].(int64); ok && v > 0 {
191+
sqlOp.ChunkDetails.SlowestChunkLatencyMs = v
192+
}
193+
if v, ok := tags["chunk_sum_latency_ms"].(int64); ok && v > 0 {
194+
sqlOp.ChunkDetails.SumChunksDownloadTimeMs = v
195+
}
196+
if v, ok := tags["chunk_total_present"].(int); ok && v > 0 {
197+
sqlOp.ChunkDetails.TotalChunksPresent = int32(v) //nolint:gosec // chunk count is always small
198+
}
187199
}
188200

189201
if opType, ok := tags["operation_type"].(string); ok {

telemetry/tags.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ const (
2121
TagPollLatency = "poll.latency_ms"
2222
)
2323

24+
// Tag names for chunk timing metrics
25+
const (
26+
TagChunkInitialLatencyMs = "chunk_initial_latency_ms"
27+
TagChunkSlowestLatencyMs = "chunk_slowest_latency_ms"
28+
TagChunkSumLatencyMs = "chunk_sum_latency_ms"
29+
TagChunkTotalPresent = "chunk_total_present"
30+
)
31+
2432
// Tag names for error metrics
2533
const (
2634
TagErrorType = "error.type"
@@ -77,6 +85,10 @@ func statementTags() []tagDefinition {
7785
{TagCompressionEnabled, exportDatabricks, "Compression enabled", false},
7886
{TagPollCount, exportDatabricks, "Poll count", false},
7987
{TagPollLatency, exportDatabricks, "Poll latency", false},
88+
{TagChunkInitialLatencyMs, exportDatabricks, "Initial chunk fetch latency ms", false},
89+
{TagChunkSlowestLatencyMs, exportDatabricks, "Slowest chunk fetch latency ms", false},
90+
{TagChunkSumLatencyMs, exportDatabricks, "Sum of chunk fetch latencies ms", false},
91+
{TagChunkTotalPresent, exportDatabricks, "Total chunks reported by server", false},
8092
}
8193
}
8294

0 commit comments

Comments
 (0)