Skip to content

Commit 3aaedd1

Browse files
committed
Fix chunk_total_present for paginated CloudFetch
The previous fix (checking chunkTotalPresent == 0 in closeCallback) only worked for inline ArrowBatch results. For paginated CloudFetch (1 result link per FetchResults call), telemetryUpdate set chunkTotalPresent = 1 on the first page, causing the closeCallback gate to never fire. The final chunk_total_present would be 1 instead of the actual page count. Fix: track actual S3 file downloads via cloudFetchFileCount (incremented in cloudFetchCallback per file). closeCallback now sets chunk_total_present from cloudFetchFileCount when CloudFetch was used, or from chunkCount for inline ArrowBatch results. This correctly handles all three cases: - Paginated CloudFetch (1 link/page): fileCount == pageCount == correct - Bulk CloudFetch (all links in DirectResults): fileCount == S3 downloads - Inline ArrowBatch: fileCount == 0, falls back to chunkCount Also removes the now-incorrect early chunk_total_present setting from telemetryUpdate (len(ResultLinks) per response is not the grand total for paginated CloudFetch) and fixes the misleading comment in rows.go. Co-authored-by: Isaac
1 parent b084c59 commit 3aaedd1

2 files changed

Lines changed: 18 additions & 13 deletions

File tree

connection.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,16 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
240240
chunkTimingSlowestMs int64
241241
chunkTimingSumMs int64
242242
chunkTimingInitialSet bool
243-
chunkTotalPresent int32
243+
// cloudFetchFileCount counts individual S3 files downloaded via CloudFetch.
244+
// Used to set chunk_total_present correctly for both bulk and paginated CloudFetch:
245+
// - paginated CF (1 link/FetchResults): file count == page count == correct total
246+
// - bulk CF (all links in DirectResults): file count == actual S3 downloads
247+
// For inline ArrowBatch results this stays 0 and chunk_total_present falls back to chunkCount.
248+
cloudFetchFileCount int
244249
)
245250

246251
// Telemetry callback invoked after each result page is fetched.
247-
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32) {
252+
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, _ int32) {
248253
if c.telemetry == nil {
249254
return
250255
}
@@ -265,12 +270,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
265270
c.telemetry.AddTag(ctx, "chunk_slowest_latency_ms", chunkTimingSlowestMs)
266271
c.telemetry.AddTag(ctx, "chunk_sum_latency_ms", chunkTimingSumMs)
267272
}
268-
269-
// Record server-reported total chunks from first non-zero report.
270-
if totalChunksPresent > 0 && chunkTotalPresent == 0 {
271-
chunkTotalPresent = totalChunksPresent
272-
c.telemetry.AddTag(ctx, "chunk_total_present", int(chunkTotalPresent))
273-
}
273+
// chunk_total_present is set definitively in closeCallback once all pages are known.
274274
}
275275

276276
// cloudFetchCallback is invoked per S3 file download for CloudFetch result sets.
@@ -283,6 +283,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
283283
if downloadMs <= 0 {
284284
return
285285
}
286+
cloudFetchFileCount++ // track actual S3 downloads for chunk_total_present
286287
if !chunkTimingInitialSet {
287288
chunkTimingInitialMs = downloadMs
288289
chunkTimingInitialSet = true
@@ -306,9 +307,14 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
306307
connID := c.id
307308
stmtID := statementID
308309
closeCallback = func(latencyMs int64, chunkCount int, closeErr error) {
309-
// If the server never reported total chunks (paginated CloudFetch / inline),
310-
// derive it from the final chunk count now that all iteration is complete.
311-
if chunkTotalPresent == 0 && chunkCount > 0 {
310+
// Set chunk_total_present to the definitive total now that all iteration is done.
311+
// For CloudFetch, use cloudFetchFileCount (actual S3 downloads) — this handles
312+
// both paginated CF (1 link/page, so file count == page count) and bulk CF
313+
// (all links in DirectResults, so file count == total S3 files).
314+
// For inline ArrowBatch, cloudFetchFileCount is 0; fall back to chunkCount.
315+
if cloudFetchFileCount > 0 {
316+
interceptor.AddTag(ctx, "chunk_total_present", cloudFetchFileCount)
317+
} else if chunkCount > 0 {
312318
interceptor.AddTag(ctx, "chunk_total_present", chunkCount)
313319
}
314320
// Emit EXECUTE_STATEMENT with complete chunk data now that iteration is done.

internal/rows/rows.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,7 @@ func (r *rows) fetchResultPage() error {
521521
}
522522
}
523523

524-
// For CloudFetch, the result links in the response reveal the server-reported total.
525-
// The FetchResults RPC only returns presigned URLs for CloudFetch — the actual data
524+
// For CloudFetch, the FetchResults RPC only returns presigned S3 URLs — the actual data
526525
// transfer happens later via S3 HTTP GETs timed by cloudFetchCallback. Report 0 latency
527526
// here so the Thrift round-trip is not misreported as chunk download time.
528527
var totalPresent int32

0 commit comments

Comments
 (0)