Skip to content

Commit d8aaaa7

Browse files
authored
add test cases to build robustness (#48)
* add test cases to build robustness * Add direct submit sequence tests * ++
1 parent a788ced commit d8aaaa7

File tree

2 files changed

+794
-36
lines changed

2 files changed

+794
-36
lines changed

pkg/submit/direct.go

Lines changed: 185 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strconv"
78
"strings"
89
"sync"
910
"time"
@@ -19,7 +20,10 @@ const (
1920
maxSequenceRetryRounds = 2
2021
)
2122

22-
var errSequenceMismatch = errors.New("account sequence mismatch")
23+
var (
24+
errSequenceMismatch = errors.New("account sequence mismatch")
25+
errTooManyInFlight = errors.New("too many in-flight submissions")
26+
)
2327

2428
// DirectConfig contains the fixed submission settings Apex owns for direct
2529
// celestia-app writes.
@@ -42,6 +46,12 @@ type DirectSubmitter struct {
4246
pollInterval time.Duration
4347
feeDenom string
4448
mu sync.Mutex
49+
inFlight int
50+
accountNumber uint64
51+
nextSequence uint64
52+
sequenceReady bool
53+
pendingSequences map[string]uint64
54+
maxInFlight int
4555
}
4656

4757
// NewDirectSubmitter builds a concrete single-account submitter.
@@ -77,6 +87,7 @@ func NewDirectSubmitter(app AppClient, signer *Signer, cfg DirectConfig) (*Direc
7787
confirmationTimeout: cfg.ConfirmationTimeout,
7888
pollInterval: defaultPollInterval,
7989
feeDenom: defaultFeeDenom,
90+
pendingSequences: make(map[string]uint64),
8091
}, nil
8192
}
8293

@@ -87,29 +98,23 @@ func (s *DirectSubmitter) Close() error {
8798
return s.app.Close()
8899
}
89100

90-
// Submit serializes submissions for the configured signer so sequence handling
91-
// stays bounded and explicit in v1.
101+
// Submit serializes sequence reservation and broadcast for the configured
102+
// signer, then waits for confirmation without blocking the next nonce.
92103
func (s *DirectSubmitter) Submit(ctx context.Context, req *Request) (*Result, error) {
93104
if err := validateSubmitRequest(req); err != nil {
94105
return nil, err
95106
}
107+
if err := s.startSubmission(); err != nil {
108+
return nil, err
109+
}
110+
defer s.finishSubmission()
96111

97-
s.mu.Lock()
98-
defer s.mu.Unlock()
99-
100-
var lastErr error
101-
for range maxSequenceRetryRounds {
102-
result, err := s.submitOnce(ctx, req)
103-
if err == nil {
104-
return result, nil
105-
}
106-
lastErr = err
107-
if !errors.Is(err, errSequenceMismatch) {
108-
return nil, err
109-
}
112+
broadcast, err := s.broadcastTx(ctx, req)
113+
if err != nil {
114+
return nil, err
110115
}
111116

112-
return nil, lastErr
117+
return s.waitForConfirmation(ctx, broadcast.Hash)
113118
}
114119

115120
func validateSubmitRequest(req *Request) error {
@@ -127,32 +132,154 @@ func validateSubmitRequest(req *Request) error {
127132
return nil
128133
}
129134

130-
func (s *DirectSubmitter) submitOnce(ctx context.Context, req *Request) (*Result, error) {
131-
account, err := s.app.AccountInfo(ctx, s.signer.Address())
132-
if err != nil {
133-
return nil, fmt.Errorf("query submission account: %w", err)
135+
func (s *DirectSubmitter) broadcastTx(ctx context.Context, req *Request) (*TxStatus, error) {
136+
s.mu.Lock()
137+
defer s.mu.Unlock()
138+
139+
var lastErr error
140+
for range maxSequenceRetryRounds {
141+
account, err := s.nextAccountLocked(ctx)
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
txBytes, err := s.buildBlobTx(req, account)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
broadcast, err := s.app.BroadcastTx(ctx, txBytes)
152+
if err != nil {
153+
if isSequenceMismatchText(err.Error()) {
154+
s.recoverSequenceLocked(account, err.Error())
155+
lastErr = fmt.Errorf("%w: %w", errSequenceMismatch, err)
156+
continue
157+
}
158+
return nil, fmt.Errorf("broadcast blob tx: %w", err)
159+
}
160+
if err := checkTxStatus("broadcast", broadcast); err != nil {
161+
if errors.Is(err, errSequenceMismatch) {
162+
s.recoverSequenceLocked(account, err.Error())
163+
lastErr = err
164+
continue
165+
}
166+
return nil, err
167+
}
168+
169+
if broadcast.Hash != "" {
170+
s.rememberPendingLocked(broadcast.Hash, account.Sequence)
171+
}
172+
s.nextSequence = account.Sequence + 1
173+
s.sequenceReady = true
174+
return broadcast, nil
134175
}
135-
if account == nil {
136-
return nil, errors.New("query submission account: empty response")
176+
177+
return nil, lastErr
178+
}
179+
180+
func (s *DirectSubmitter) nextAccountLocked(ctx context.Context) (*AccountInfo, error) {
181+
if !s.sequenceReady {
182+
account, err := s.app.AccountInfo(ctx, s.signer.Address())
183+
if err != nil {
184+
return nil, fmt.Errorf("query submission account: %w", err)
185+
}
186+
if account == nil {
187+
return nil, errors.New("query submission account: empty response")
188+
}
189+
190+
s.accountNumber = account.AccountNumber
191+
s.nextSequence = account.Sequence
192+
s.sequenceReady = true
193+
if err := s.reconcilePendingLocked(ctx); err != nil {
194+
return nil, err
195+
}
137196
}
138197

139-
txBytes, err := s.buildBlobTx(req, account)
140-
if err != nil {
141-
return nil, err
198+
return &AccountInfo{
199+
Address: s.signer.Address(),
200+
AccountNumber: s.accountNumber,
201+
Sequence: s.nextSequence,
202+
}, nil
203+
}
204+
205+
func (s *DirectSubmitter) invalidateSequenceLocked() {
206+
s.accountNumber = 0
207+
s.nextSequence = 0
208+
s.sequenceReady = false
209+
}
210+
211+
func (s *DirectSubmitter) startSubmission() error {
212+
s.mu.Lock()
213+
defer s.mu.Unlock()
214+
215+
if s.maxInFlight > 0 && s.inFlight >= s.maxInFlight {
216+
return errTooManyInFlight
142217
}
218+
s.inFlight++
219+
return nil
220+
}
143221

144-
broadcast, err := s.app.BroadcastTx(ctx, txBytes)
145-
if err != nil {
146-
if isSequenceMismatchText(err.Error()) {
147-
return nil, fmt.Errorf("%w: %w", errSequenceMismatch, err)
222+
func (s *DirectSubmitter) finishSubmission() {
223+
s.mu.Lock()
224+
defer s.mu.Unlock()
225+
226+
if s.inFlight > 0 {
227+
s.inFlight--
228+
}
229+
}
230+
231+
func (s *DirectSubmitter) recoverSequenceLocked(account *AccountInfo, errText string) {
232+
expected, ok := expectedSequenceFromMismatchText(errText)
233+
if !ok {
234+
s.invalidateSequenceLocked()
235+
return
236+
}
237+
238+
s.accountNumber = account.AccountNumber
239+
s.nextSequence = expected
240+
s.sequenceReady = true
241+
}
242+
243+
func (s *DirectSubmitter) reconcilePendingLocked(ctx context.Context) error {
244+
if len(s.pendingSequences) == 0 {
245+
return nil
246+
}
247+
248+
nextSequence := s.nextSequence
249+
for hash, sequence := range s.pendingSequences {
250+
_, err := s.app.GetTx(ctx, hash)
251+
if err == nil {
252+
delete(s.pendingSequences, hash)
253+
continue
148254
}
149-
return nil, fmt.Errorf("broadcast blob tx: %w", err)
255+
if isTxNotFound(err) {
256+
if sequence >= nextSequence {
257+
nextSequence = sequence + 1
258+
}
259+
continue
260+
}
261+
return fmt.Errorf("reconcile pending blob tx %s: %w", hash, err)
150262
}
151-
if err := checkTxStatus("broadcast", broadcast); err != nil {
152-
return nil, err
263+
264+
s.nextSequence = nextSequence
265+
return nil
266+
}
267+
268+
func (s *DirectSubmitter) rememberPendingLocked(hash string, sequence uint64) {
269+
if hash == "" {
270+
return
153271
}
272+
s.pendingSequences[hash] = sequence
273+
}
154274

155-
return s.waitForConfirmation(ctx, broadcast.Hash)
275+
func (s *DirectSubmitter) clearPending(hash string) {
276+
if hash == "" {
277+
return
278+
}
279+
280+
s.mu.Lock()
281+
defer s.mu.Unlock()
282+
delete(s.pendingSequences, hash)
156283
}
157284

158285
func (s *DirectSubmitter) buildBlobTx(req *Request, account *AccountInfo) ([]byte, error) {
@@ -297,6 +424,7 @@ func (s *DirectSubmitter) waitForConfirmation(parent context.Context, hash strin
297424
for {
298425
tx, err := s.app.GetTx(ctx, hash)
299426
if err == nil {
427+
s.clearPending(hash)
300428
if err := checkTxStatus("confirm", tx); err != nil {
301429
return nil, err
302430
}
@@ -339,6 +467,29 @@ func isSequenceMismatchText(text string) bool {
339467
return strings.Contains(text, "account sequence mismatch") || strings.Contains(text, "incorrect account sequence")
340468
}
341469

470+
func expectedSequenceFromMismatchText(text string) (uint64, bool) {
471+
lower := strings.ToLower(text)
472+
idx := strings.Index(lower, "expected ")
473+
if idx < 0 {
474+
return 0, false
475+
}
476+
477+
start := idx + len("expected ")
478+
end := start
479+
for end < len(lower) && lower[end] >= '0' && lower[end] <= '9' {
480+
end++
481+
}
482+
if end == start {
483+
return 0, false
484+
}
485+
486+
sequence, err := strconv.ParseUint(lower[start:end], 10, 64)
487+
if err != nil {
488+
return 0, false
489+
}
490+
return sequence, true
491+
}
492+
342493
func isTxNotFound(err error) bool {
343494
return status.Code(err) == codes.NotFound
344495
}

0 commit comments

Comments
 (0)