Skip to content

Commit b084c59

Browse files
committed
Add correctness tests for telemetry gap fixes
Four targeted test suites covering the bug fixes from the previous commit: aggregator_test.go (new): - WaitsForInFlightWorkerExports: close() blocks until all HTTP exports complete - DrainsPendingQueueJobsBeforeCancel: drain step exports pending queue jobs - InFlightAddBeforeSend: inFlight.Add happens before queue send (no missed jobs) - SafeToCallMultipleTimes: concurrent close() calls don't deadlock (sync.Once) - DropWhenQueueFull: drop path calls inFlight.Done() so Wait() is never stuck integration_test.go: - OperationLatencyMs_ZeroNotOmitted: 0ms latency serialises as 0 not null (omitempty fix) - ChunkTotalPresent_DerivedFromChunkCount: chunk_total_present tag propagates to payload batchloader_test.go: - OnFileDownloaded callback is called once per file with positive downloadMs - Nil callback does not panic (non-telemetry paths) rows_test.go: - CloseCallback receives correct chunkCount after multi-page iteration - Nil closeCallback does not panic Co-authored-by: Isaac
1 parent 6c99a84 commit b084c59

4 files changed

Lines changed: 656 additions & 0 deletions

File tree

internal/rows/arrowbased/batchloader_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/http"
88
"net/http/httptest"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -339,6 +340,115 @@ func TestCloudFetchIterator(t *testing.T) {
339340
})
340341
}
341342

343+
// TestCloudFetchIterator_OnFileDownloaded_CallbackInvokedWithPositiveDuration verifies
344+
// that the onFileDownloaded telemetry callback is called once per downloaded S3 file with
345+
// a positive downloadMs value.
346+
//
347+
// This covers the CloudFetch timing fix where per-S3-file download durations are measured
348+
// and reported as initial_chunk_latency_ms / slowest_chunk_latency_ms / sum_chunks_download_time_ms
349+
// in the telemetry payload.
350+
func TestCloudFetchIterator_OnFileDownloaded_CallbackInvokedWithPositiveDuration(t *testing.T) {
351+
// Serve real arrow bytes so the iterator can parse them successfully.
352+
arrowBytes := generateMockArrowBytes(generateArrowRecord())
353+
354+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
355+
// Add a tiny sleep so the measured download time is reliably > 0ms.
356+
time.Sleep(2 * time.Millisecond)
357+
w.WriteHeader(http.StatusOK)
358+
_, _ = w.Write(arrowBytes)
359+
}))
360+
defer server.Close()
361+
362+
startRowOffset := int64(0)
363+
links := []*cli_service.TSparkArrowResultLink{
364+
{
365+
FileLink: server.URL,
366+
ExpiryTime: time.Now().Add(10 * time.Minute).Unix(),
367+
StartRowOffset: startRowOffset,
368+
RowCount: 1,
369+
},
370+
{
371+
FileLink: server.URL,
372+
ExpiryTime: time.Now().Add(10 * time.Minute).Unix(),
373+
StartRowOffset: startRowOffset + 1,
374+
RowCount: 1,
375+
},
376+
}
377+
378+
cfg := config.WithDefaults()
379+
cfg.UseLz4Compression = false
380+
cfg.MaxDownloadThreads = 1
381+
382+
var callbackMu sync.Mutex
383+
var downloadDurations []int64
384+
385+
onFileDownloaded := func(downloadMs int64) {
386+
callbackMu.Lock()
387+
downloadDurations = append(downloadDurations, downloadMs)
388+
callbackMu.Unlock()
389+
}
390+
391+
bi, err := NewCloudBatchIterator(context.Background(), links, startRowOffset, cfg, onFileDownloaded)
392+
assert.Nil(t, err)
393+
394+
// Consume all batches to trigger the downloads.
395+
for bi.HasNext() {
396+
_, nextErr := bi.Next()
397+
assert.Nil(t, nextErr)
398+
}
399+
400+
callbackMu.Lock()
401+
durations := make([]int64, len(downloadDurations))
402+
copy(durations, downloadDurations)
403+
callbackMu.Unlock()
404+
405+
// Callback must be invoked once per link.
406+
assert.Equal(t, len(links), len(durations),
407+
"onFileDownloaded must be called once per downloaded file")
408+
409+
// Each reported duration must be positive (the server adds a 2ms delay).
410+
for i, d := range durations {
411+
assert.Greater(t, d, int64(0),
412+
"onFileDownloaded must report positive downloadMs for file %d, got %d", i, d)
413+
}
414+
}
415+
416+
// TestCloudFetchIterator_OnFileDownloaded_NilCallbackDoesNotPanic verifies that passing
417+
// nil for onFileDownloaded (non-telemetry paths) does not cause a panic during iteration.
418+
func TestCloudFetchIterator_OnFileDownloaded_NilCallbackDoesNotPanic(t *testing.T) {
419+
arrowBytes := generateMockArrowBytes(generateArrowRecord())
420+
421+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
422+
w.WriteHeader(http.StatusOK)
423+
_, _ = w.Write(arrowBytes)
424+
}))
425+
defer server.Close()
426+
427+
startRowOffset := int64(0)
428+
links := []*cli_service.TSparkArrowResultLink{
429+
{
430+
FileLink: server.URL,
431+
ExpiryTime: time.Now().Add(10 * time.Minute).Unix(),
432+
StartRowOffset: startRowOffset,
433+
RowCount: 1,
434+
},
435+
}
436+
437+
cfg := config.WithDefaults()
438+
cfg.UseLz4Compression = false
439+
cfg.MaxDownloadThreads = 1
440+
441+
// nil callback — must not panic
442+
bi, err := NewCloudBatchIterator(context.Background(), links, startRowOffset, cfg, nil)
443+
assert.Nil(t, err)
444+
445+
assert.NotPanics(t, func() {
446+
for bi.HasNext() {
447+
_, _ = bi.Next()
448+
}
449+
}, "nil onFileDownloaded must not cause a panic")
450+
}
451+
342452
func generateArrowRecord() arrow.Record {
343453
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
344454

internal/rows/rows_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,3 +1563,121 @@ func TestFetchResultPage_PropagatesGetNextPageError(t *testing.T) {
15631563

15641564
assert.ErrorContains(t, actualErr, errorMsg)
15651565
}
1566+
1567+
// TestRows_CloseCallback_ReceivesChunkCount verifies that when rows.Close() is called,
1568+
// the closeCallback receives the correct chunkCount reflecting the number of result pages
1569+
// that were fetched during iteration.
1570+
//
1571+
// This covers the fix where total_chunks_present in the telemetry payload was always null
1572+
// for paginated CloudFetch queries: the driver now derives it from r.chunkCount and passes
1573+
// it through closeCallback so connection.go can set the "chunk_total_present" tag.
1574+
func TestRows_CloseCallback_ReceivesChunkCount(t *testing.T) {
1575+
t.Parallel()
1576+
1577+
noMoreRows := false
1578+
moreRows := true
1579+
1580+
// Two pages: page 0 (5 rows, has more), page 1 (3 rows, no more).
1581+
colVals := []*cli_service.TColumn{
1582+
{BoolVal: &cli_service.TBoolColumn{Values: []bool{true, false, true, false, true}}},
1583+
}
1584+
colVals2 := []*cli_service.TColumn{
1585+
{BoolVal: &cli_service.TBoolColumn{Values: []bool{true, false, true}}},
1586+
}
1587+
1588+
pages := []cli_service.TFetchResultsResp{
1589+
{
1590+
Status: &cli_service.TStatus{StatusCode: cli_service.TStatusCode_SUCCESS_STATUS},
1591+
HasMoreRows: &moreRows,
1592+
Results: &cli_service.TRowSet{StartRowOffset: 0, Columns: colVals},
1593+
},
1594+
{
1595+
Status: &cli_service.TStatus{StatusCode: cli_service.TStatusCode_SUCCESS_STATUS},
1596+
HasMoreRows: &noMoreRows,
1597+
Results: &cli_service.TRowSet{StartRowOffset: 5, Columns: colVals2},
1598+
},
1599+
}
1600+
1601+
pageIndex := -1
1602+
fetchFn := func(ctx context.Context, req *cli_service.TFetchResultsReq) (*cli_service.TFetchResultsResp, error) {
1603+
pageIndex++
1604+
p := pages[pageIndex]
1605+
return &p, nil
1606+
}
1607+
metaFn := func(ctx context.Context, req *cli_service.TGetResultSetMetadataReq) (*cli_service.TGetResultSetMetadataResp, error) {
1608+
return &cli_service.TGetResultSetMetadataResp{
1609+
Status: &cli_service.TStatus{StatusCode: cli_service.TStatusCode_SUCCESS_STATUS},
1610+
Schema: &cli_service.TTableSchema{
1611+
Columns: []*cli_service.TColumnDesc{
1612+
{ColumnName: "flag", Position: 0, TypeDesc: &cli_service.TTypeDesc{
1613+
Types: []*cli_service.TTypeEntry{{
1614+
PrimitiveEntry: &cli_service.TPrimitiveTypeEntry{Type: cli_service.TTypeId_BOOLEAN_TYPE},
1615+
}},
1616+
}},
1617+
},
1618+
},
1619+
}, nil
1620+
}
1621+
1622+
testClient := &client.TestClient{
1623+
FnFetchResults: fetchFn,
1624+
FnGetResultSetMetadata: metaFn,
1625+
}
1626+
1627+
var callbackChunkCount int
1628+
closeCallback := func(latencyMs int64, chunkCount int, err error) {
1629+
callbackChunkCount = chunkCount
1630+
}
1631+
1632+
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
1633+
cfg := config.WithDefaults()
1634+
cfg.MaxRows = 5 // force paging
1635+
1636+
dr, dbErr := NewRows(ctx, nil, testClient, cfg, nil, nil, closeCallback, nil)
1637+
assert.Nil(t, dbErr)
1638+
1639+
// Drain all rows to force two FetchResults calls.
1640+
dest := make([]driver.Value, 1)
1641+
for dr.Next(dest) == nil {
1642+
}
1643+
1644+
// Close should invoke the callback with the total chunk count (2 pages fetched).
1645+
assert.Nil(t, dr.Close())
1646+
1647+
// direct results count as chunk 0; two FetchResults calls give chunkCount=2.
1648+
// (No directResults here so chunkCount starts at 0, then +1 per FetchResults call.)
1649+
assert.Equal(t, 2, callbackChunkCount,
1650+
"closeCallback must receive the total number of result pages fetched")
1651+
}
1652+
1653+
// TestRows_CloseCallback_NilDoesNotPanic verifies that passing nil for closeCallback
1654+
// does not cause a panic when rows.Close() is called.
1655+
func TestRows_CloseCallback_NilDoesNotPanic(t *testing.T) {
1656+
t.Parallel()
1657+
1658+
noMoreRows := false
1659+
pages := []cli_service.TFetchResultsResp{
1660+
{
1661+
Status: &cli_service.TStatus{StatusCode: cli_service.TStatusCode_SUCCESS_STATUS},
1662+
HasMoreRows: &noMoreRows,
1663+
Results: &cli_service.TRowSet{StartRowOffset: 0, Columns: []*cli_service.TColumn{}},
1664+
},
1665+
}
1666+
pageIndex := -1
1667+
fetchFn := func(ctx context.Context, req *cli_service.TFetchResultsReq) (*cli_service.TFetchResultsResp, error) {
1668+
pageIndex++
1669+
p := pages[pageIndex]
1670+
return &p, nil
1671+
}
1672+
testClient := &client.TestClient{FnFetchResults: fetchFn}
1673+
1674+
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
1675+
cfg := config.WithDefaults()
1676+
1677+
dr, dbErr := NewRows(ctx, nil, testClient, cfg, nil, nil, nil /* closeCallback=nil */, nil)
1678+
assert.Nil(t, dbErr)
1679+
1680+
assert.NotPanics(t, func() {
1681+
_ = dr.Close()
1682+
}, "nil closeCallback must not cause a panic on rows.Close()")
1683+
}

0 commit comments

Comments
 (0)