Skip to content

Commit 138a0ad

Browse files
committed
Support multipart uploads > 5gb
1 parent ce6a7a9 commit 138a0ad

File tree

9 files changed

+587
-36
lines changed

9 files changed

+587
-36
lines changed

.surface

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ FLAG vector db import-session create --drop-tables type=bool
238238
FLAG vector db import-session create --filename type=string
239239
FLAG vector db import-session create --search-replace-from type=string
240240
FLAG vector db import-session create --search-replace-to type=string
241+
FLAG vector db import-session run --parts type=string
241242
FLAG vector deploy list --page type=int
242243
FLAG vector deploy list --per-page type=int
243244
FLAG vector deploy rollback --poll-interval type=duration

internal/api/client.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"io"
89
"net/http"
910
"net/url"
1011
"os"
@@ -87,6 +88,34 @@ func (c *Client) PutFile(ctx context.Context, url string, file *os.File) (*http.
8788
return resp, nil
8889
}
8990

91+
// PutFilePart uploads a file part via PUT to the given URL (typically a presigned
92+
// S3 URL for multipart uploads). It returns the ETag header from the response.
93+
// Unlike other methods, this does not add Authorization or Accept headers.
94+
func (c *Client) PutFilePart(ctx context.Context, url string, body io.Reader, contentLength int64) (string, error) {
95+
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
96+
if err != nil {
97+
return "", fmt.Errorf("creating file part upload request: %w", err)
98+
}
99+
req.ContentLength = contentLength
100+
req.Header.Set("User-Agent", c.UserAgent)
101+
102+
resp, err := c.httpClient.Do(req)
103+
if err != nil {
104+
return "", fmt.Errorf("executing file part upload: %w", err)
105+
}
106+
defer func() { _ = resp.Body.Close() }()
107+
108+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
109+
return "", ParseErrorResponse(resp)
110+
}
111+
112+
etag := resp.Header.Get("Etag")
113+
if etag == "" {
114+
return "", fmt.Errorf("S3 response missing ETag header")
115+
}
116+
return etag, nil
117+
}
118+
90119
// jsonRequest is a helper that JSON-encodes a body and sends a request.
91120
// When body is nil, the request is sent with no body and no Content-Type header.
92121
func (c *Client) jsonRequest(ctx context.Context, method, path string, body any) (*http.Response, error) {

internal/api/client_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package api
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"io"
@@ -199,6 +200,68 @@ func TestClient_PutFile(t *testing.T) {
199200
assert.Equal(t, "file-content", string(gotBody))
200201
}
201202

203+
func TestClient_PutFilePart(t *testing.T) {
204+
var gotMethod, gotUserAgent string
205+
var gotBody []byte
206+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
207+
gotMethod = r.Method
208+
gotUserAgent = r.Header.Get("User-Agent")
209+
gotBody, _ = io.ReadAll(r.Body)
210+
// PutFilePart should not add Authorization or Accept headers (presigned S3 URL).
211+
assert.Empty(t, r.Header.Get("Authorization"))
212+
assert.Empty(t, r.Header.Get("Accept"))
213+
w.Header().Set("Etag", `"abc123"`)
214+
w.WriteHeader(http.StatusOK)
215+
}))
216+
defer srv.Close()
217+
218+
content := []byte("chunk-data-here")
219+
reader := bytes.NewReader(content)
220+
221+
c := NewClient("https://api.example.com", "tok", "vector-cli/test")
222+
etag, err := c.PutFilePart(context.Background(), srv.URL+"/upload/part1", reader, int64(len(content)))
223+
require.NoError(t, err)
224+
225+
assert.Equal(t, http.MethodPut, gotMethod)
226+
assert.Equal(t, "vector-cli/test", gotUserAgent)
227+
assert.Equal(t, "chunk-data-here", string(gotBody))
228+
assert.Equal(t, `"abc123"`, etag)
229+
}
230+
231+
func TestClient_PutFilePartErrorResponse(t *testing.T) {
232+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
233+
w.WriteHeader(http.StatusForbidden)
234+
}))
235+
defer srv.Close()
236+
237+
content := []byte("data")
238+
reader := bytes.NewReader(content)
239+
240+
c := NewClient("https://api.example.com", "tok", "")
241+
_, err := c.PutFilePart(context.Background(), srv.URL+"/upload", reader, int64(len(content)))
242+
require.Error(t, err)
243+
244+
apiErr, ok := err.(*APIError)
245+
require.True(t, ok, "error should be *APIError")
246+
assert.Equal(t, 403, apiErr.HTTPStatus)
247+
}
248+
249+
func TestClient_PutFilePartMissingEtag(t *testing.T) {
250+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
251+
// Return 200 but no ETag header
252+
w.WriteHeader(http.StatusOK)
253+
}))
254+
defer srv.Close()
255+
256+
content := []byte("data")
257+
reader := bytes.NewReader(content)
258+
259+
c := NewClient("https://api.example.com", "tok", "")
260+
_, err := c.PutFilePart(context.Background(), srv.URL+"/upload", reader, int64(len(content)))
261+
require.Error(t, err)
262+
assert.Contains(t, err.Error(), "missing ETag")
263+
}
264+
202265
func TestClient_ErrorResponse(t *testing.T) {
203266
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
204267
w.Header().Set("Content-Type", "application/json")

internal/commands/archive.go

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,57 @@ import (
99

1010
"github.com/spf13/cobra"
1111

12+
"github.com/built-fast/vector-cli/internal/appctx"
1213
"github.com/built-fast/vector-cli/internal/output"
1314
)
1415

16+
func uploadMultipart(cmd *cobra.Command, app *appctx.App, file *os.File, fileSize int64, filename string, importID string, uploadParts []any) ([]map[string]any, error) {
17+
w := cmd.ErrOrStderr()
18+
partCount := int64(len(uploadParts))
19+
baseSize := fileSize / partCount
20+
lastSize := fileSize - baseSize*(partCount-1)
21+
22+
sizeMB := float64(fileSize) / (1024 * 1024)
23+
_, _ = fmt.Fprintf(w, "Uploading %s (%.1f MB) in %d parts...\n", filename, sizeMB, partCount)
24+
25+
completedParts := make([]map[string]any, 0, partCount)
26+
27+
for i, part := range uploadParts {
28+
partMap, ok := part.(map[string]any)
29+
if !ok {
30+
return nil, fmt.Errorf("invalid upload part at index %d", i)
31+
}
32+
33+
partNumber := int(getFloat(partMap, "part_number"))
34+
partURL := getString(partMap, "url")
35+
if partURL == "" {
36+
return nil, fmt.Errorf("upload part %d missing URL", partNumber)
37+
}
38+
39+
chunkSize := baseSize
40+
if i == len(uploadParts)-1 {
41+
chunkSize = lastSize
42+
}
43+
offset := baseSize * int64(i)
44+
45+
_, _ = fmt.Fprintf(w, " Uploading part %d/%d...\n", partNumber, partCount)
46+
47+
section := io.NewSectionReader(file, offset, chunkSize)
48+
etag, err := app.Client.PutFilePart(cmd.Context(), partURL, section, chunkSize)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to upload part %d: %w", partNumber, err)
51+
}
52+
53+
completedParts = append(completedParts, map[string]any{
54+
"part_number": partNumber,
55+
"etag": etag,
56+
})
57+
}
58+
59+
_, _ = fmt.Fprintln(w, "Upload complete.")
60+
return completedParts, nil
61+
}
62+
1563
// NewArchiveCmd creates the archive command group.
1664
func NewArchiveCmd() *cobra.Command {
1765
cmd := &cobra.Command{
@@ -116,40 +164,59 @@ func newArchiveImportCmd() *cobra.Command {
116164
}
117165

118166
importID := getString(item, "id")
119-
uploadURL := getString(item, "upload_url")
120-
121-
if importID == "" || uploadURL == "" {
122-
return fmt.Errorf("import session response missing upload URL or import ID")
167+
if importID == "" {
168+
return fmt.Errorf("import session response missing import ID")
123169
}
124170

125-
// Step 2: Upload file to presigned URL
126-
sizeMB := float64(fileSize) / (1024 * 1024)
127-
_, _ = fmt.Fprintf(w, "Uploading %s (%.1f MB)...\n", filename, sizeMB)
171+
// Step 2: Upload file
172+
var runBody any
128173

129-
uploadResp, err := app.Client.PutFile(cmd.Context(), uploadURL, file)
130-
if err != nil {
131-
return fmt.Errorf("failed to upload file: %w", err)
132-
}
133-
defer func() { _ = uploadResp.Body.Close() }()
174+
if getBool(item, "is_multipart") {
175+
uploadParts := getSlice(item, "upload_parts")
176+
if len(uploadParts) == 0 {
177+
return fmt.Errorf("multipart import session response missing upload parts")
178+
}
134179

135-
_, _ = fmt.Fprintln(w, "Upload complete.")
180+
completedParts, uploadErr := uploadMultipart(cmd, app, file, fileSize, filename, importID, uploadParts)
181+
if uploadErr != nil {
182+
return uploadErr
183+
}
184+
185+
runBody = map[string]any{"parts": completedParts}
186+
} else {
187+
uploadURL := getString(item, "upload_url")
188+
if uploadURL == "" {
189+
return fmt.Errorf("import session response missing upload URL")
190+
}
191+
192+
sizeMB := float64(fileSize) / (1024 * 1024)
193+
_, _ = fmt.Fprintf(w, "Uploading %s (%.1f MB)...\n", filename, sizeMB)
194+
195+
uploadResp, uploadErr := app.Client.PutFile(cmd.Context(), uploadURL, file)
196+
if uploadErr != nil {
197+
return fmt.Errorf("failed to upload file: %w", uploadErr)
198+
}
199+
defer func() { _ = uploadResp.Body.Close() }()
200+
201+
_, _ = fmt.Fprintln(w, "Upload complete.")
202+
}
136203

137204
// Step 3: Trigger import
138205
_, _ = fmt.Fprintln(w, "Starting import...")
139206

140207
runEndpoint := fmt.Sprintf("%s/%s/run", importsPath(siteID), importID)
141-
runResp, err := app.Client.Post(cmd.Context(), runEndpoint, nil)
208+
runResp, err := app.Client.Post(cmd.Context(), runEndpoint, runBody)
142209
if err != nil {
143210
return fmt.Errorf("failed to start import: %w", err)
144211
}
145212
defer func() { _ = runResp.Body.Close() }()
146213

147-
runBody, err := io.ReadAll(runResp.Body)
214+
runRespBody, err := io.ReadAll(runResp.Body)
148215
if err != nil {
149216
return fmt.Errorf("failed to start import: %w", err)
150217
}
151218

152-
runData, err := parseResponseData(runBody)
219+
runData, err := parseResponseData(runRespBody)
153220
if err != nil {
154221
return fmt.Errorf("failed to start import: %w", err)
155222
}

0 commit comments

Comments
 (0)