-
Notifications
You must be signed in to change notification settings - Fork 260
fix(sequencers/single): deterministic queue #2938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c26a567
8219b61
06db707
fcd0fca
ff87318
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,11 @@ package single | |
|
|
||
| import ( | ||
| "context" | ||
| "encoding/binary" | ||
| "encoding/hex" | ||
| "errors" | ||
| "fmt" | ||
| "strconv" | ||
| "sync" | ||
|
|
||
| ds "github.com/ipfs/go-datastore" | ||
|
|
@@ -20,26 +22,52 @@ import ( | |
| // ErrQueueFull is returned when the batch queue has reached its maximum size | ||
| var ErrQueueFull = errors.New("batch queue is full") | ||
|
|
||
| // initialSeqNum is the starting sequence number for new queues. | ||
| // It is set to the middle of the uint64 range to allow for both | ||
| // appending (incrementing) and prepending (decrementing) transactions. | ||
| const initialSeqNum = uint64(0x8000000000000000) | ||
|
|
||
| // queuedItem holds a batch and its associated persistence key | ||
| type queuedItem struct { | ||
| Batch coresequencer.Batch | ||
| Key string | ||
| } | ||
|
|
||
| // BatchQueue implements a persistent queue for transaction batches | ||
| type BatchQueue struct { | ||
| queue []coresequencer.Batch | ||
| queue []queuedItem | ||
| head int // index of the first element in the queue | ||
| maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited) | ||
| mu sync.Mutex | ||
| db ds.Batching | ||
|
|
||
| // Sequence numbers for generating new keys | ||
| nextAddSeq uint64 | ||
| nextPrependSeq uint64 | ||
|
|
||
| mu sync.Mutex | ||
| db ds.Batching | ||
| } | ||
|
|
||
| // NewBatchQueue creates a new BatchQueue with the specified maximum size. | ||
| // If maxSize is 0, the queue will be unlimited. | ||
| func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue { | ||
| return &BatchQueue{ | ||
| queue: make([]coresequencer.Batch, 0), | ||
| head: 0, | ||
| maxQueueSize: maxSize, | ||
| db: store.NewPrefixKVStore(db, prefix), | ||
| queue: make([]queuedItem, 0), | ||
| head: 0, | ||
| maxQueueSize: maxSize, | ||
| db: store.NewPrefixKVStore(db, prefix), | ||
| nextAddSeq: initialSeqNum, | ||
| nextPrependSeq: initialSeqNum - 1, | ||
| } | ||
| } | ||
|
|
||
| // seqToKey converts a sequence number to a hex-encoded big-endian key. | ||
| // We use big-endian so that lexicographical sort order matches numeric order. | ||
| func seqToKey(seq uint64) string { | ||
| b := make([]byte, 8) | ||
| binary.BigEndian.PutUint64(b, seq) | ||
| return hex.EncodeToString(b) | ||
| } | ||
|
|
||
| // AddBatch adds a new transaction to the queue and writes it to the WAL. | ||
| // Returns ErrQueueFull if the queue has reached its maximum size. | ||
| func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error { | ||
|
|
@@ -53,12 +81,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e | |
| return ErrQueueFull | ||
| } | ||
|
|
||
| if err := bq.persistBatch(ctx, batch); err != nil { | ||
| key := seqToKey(bq.nextAddSeq) | ||
| if err := bq.persistBatch(ctx, batch, key); err != nil { | ||
| return err | ||
| } | ||
| bq.nextAddSeq++ | ||
|
|
||
| // Then add to in-memory queue | ||
| bq.queue = append(bq.queue, batch) | ||
| bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key}) | ||
|
|
||
| return nil | ||
| } | ||
|
|
@@ -68,25 +98,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e | |
| // The batch is persisted to the DB to ensure durability in case of crashes. | ||
| // | ||
| // NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority | ||
| // transactions can always be re-queued. This means the effective queue size may temporarily | ||
| // exceed the configured maximum when Prepend is used. This is by design to prevent loss | ||
| // of transactions that have already been accepted but couldn't fit in the current batch. | ||
| // transactions can always be re-queued. | ||
| func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error { | ||
| bq.mu.Lock() | ||
| defer bq.mu.Unlock() | ||
|
|
||
| if err := bq.persistBatch(ctx, batch); err != nil { | ||
| key := seqToKey(bq.nextPrependSeq) | ||
| if err := bq.persistBatch(ctx, batch, key); err != nil { | ||
| return err | ||
| } | ||
| bq.nextPrependSeq-- | ||
|
|
||
| item := queuedItem{Batch: batch, Key: key} | ||
|
|
||
| // Then add to in-memory queue | ||
| // If we have room before head, use it | ||
| if bq.head > 0 { | ||
| bq.head-- | ||
| bq.queue[bq.head] = batch | ||
| bq.queue[bq.head] = item | ||
| } else { | ||
| // Need to expand the queue at the front | ||
| bq.queue = append([]coresequencer.Batch{batch}, bq.queue...) | ||
| bq.queue = append([]queuedItem{item}, bq.queue...) | ||
| } | ||
|
|
||
| return nil | ||
|
|
@@ -102,8 +134,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) { | |
| return &coresequencer.Batch{Transactions: nil}, nil | ||
| } | ||
|
|
||
| batch := bq.queue[bq.head] | ||
| bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element | ||
| item := bq.queue[bq.head] | ||
| // Release memory for the dequeued element | ||
| bq.queue[bq.head] = queuedItem{} | ||
| bq.head++ | ||
|
|
||
| // Compact when head gets too large to prevent memory leaks | ||
|
|
@@ -112,59 +145,102 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) { | |
| // frequent compactions on small queues | ||
| if bq.head > len(bq.queue)/2 && bq.head > 100 { | ||
| remaining := copy(bq.queue, bq.queue[bq.head:]) | ||
| // Zero out the rest of the slice to release memory | ||
| // Zero out the rest of the slice | ||
| for i := remaining; i < len(bq.queue); i++ { | ||
| bq.queue[i] = coresequencer.Batch{} | ||
| bq.queue[i] = queuedItem{} | ||
| } | ||
| bq.queue = bq.queue[:remaining] | ||
| bq.head = 0 | ||
| } | ||
|
|
||
| hash, err := batch.Hash() | ||
| if err != nil { | ||
| return &coresequencer.Batch{Transactions: nil}, err | ||
| } | ||
| key := hex.EncodeToString(hash) | ||
|
|
||
| // Delete the batch from the WAL since it's been processed | ||
| err = bq.db.Delete(ctx, ds.NewKey(key)) | ||
| if err != nil { | ||
| // Use the stored key directly | ||
| if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { | ||
| // Log the error but continue | ||
| fmt.Printf("Error deleting processed batch: %v\n", err) | ||
| } | ||
|
|
||
| return &batch, nil | ||
| return &item.Batch, nil | ||
| } | ||
|
|
||
| // Load reloads all batches from WAL file into the in-memory queue after a crash or restart | ||
| func (bq *BatchQueue) Load(ctx context.Context) error { | ||
| bq.mu.Lock() | ||
| defer bq.mu.Unlock() | ||
|
|
||
| // Clear the current queue | ||
| bq.queue = make([]coresequencer.Batch, 0) | ||
| // Clear the current queue and reset sequences | ||
| bq.queue = make([]queuedItem, 0) | ||
| bq.head = 0 | ||
| bq.nextAddSeq = initialSeqNum | ||
| bq.nextPrependSeq = initialSeqNum - 1 | ||
|
|
||
| q := query.Query{} | ||
| q := query.Query{ | ||
| Orders: []query.Order{query.OrderByKey{}}, | ||
| } | ||
| results, err := bq.db.Query(ctx, q) | ||
| if err != nil { | ||
| return fmt.Errorf("error querying datastore: %w", err) | ||
| } | ||
| defer results.Close() | ||
|
|
||
| // Load each batch | ||
| var legacyItems []queuedItem | ||
| for result := range results.Next() { | ||
| if result.Error != nil { | ||
| fmt.Printf("Error reading entry from datastore: %v\n", result.Error) | ||
| continue | ||
| } | ||
| pbBatch := &pb.Batch{} | ||
| err := proto.Unmarshal(result.Value, pbBatch) | ||
| // We care about the last part of the key (the sequence number) | ||
| // ds.Key usually has a leading slash. | ||
| keyName := ds.NewKey(result.Key).Name() | ||
|
|
||
| var pbBatch pb.Batch | ||
| err := proto.Unmarshal(result.Value, &pbBatch) | ||
| if err != nil { | ||
| fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err) | ||
| fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err) | ||
| continue | ||
| } | ||
|
|
||
| batch := coresequencer.Batch{Transactions: pbBatch.Txs} | ||
|
|
||
| // Check if key is valid hex sequence number (16 hex chars) | ||
| // We use strict 16 check because seqToKey always produces 16 hex chars. | ||
| isValid := false | ||
| if len(keyName) == 16 { | ||
| if seq, err := strconv.ParseUint(keyName, 16, 64); err == nil { | ||
| isValid = true | ||
| if seq >= bq.nextAddSeq { | ||
| bq.nextAddSeq = seq + 1 | ||
| } | ||
| if seq <= bq.nextPrependSeq { | ||
| bq.nextPrependSeq = seq - 1 | ||
| } | ||
| } | ||
| } | ||
| if isValid { | ||
| bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: keyName}) | ||
| } else { | ||
| legacyItems = append(legacyItems, queuedItem{Batch: batch, Key: result.Key}) | ||
| } | ||
| } | ||
| if len(legacyItems) == 0 { | ||
| return nil | ||
| } | ||
| fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems)) | ||
|
|
||
| for _, item := range legacyItems { | ||
| newKeyName := seqToKey(bq.nextAddSeq) | ||
|
|
||
| if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil { | ||
| fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err) | ||
| continue | ||
| } | ||
| bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs}) | ||
|
|
||
| if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { | ||
| fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err) | ||
| } | ||
|
|
||
| bq.queue = append(bq.queue, queuedItem{Batch: item.Batch, Key: newKeyName}) | ||
| bq.nextAddSeq++ | ||
| } | ||
|
Comment on lines
187
to
244
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throughout the
Consider injecting a logger into the |
||
|
|
||
| return nil | ||
|
|
@@ -178,14 +254,8 @@ func (bq *BatchQueue) Size() int { | |
| return len(bq.queue) - bq.head | ||
| } | ||
|
|
||
| // persistBatch persists a batch to the datastore | ||
| func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error { | ||
| hash, err := batch.Hash() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| key := hex.EncodeToString(hash) | ||
|
|
||
| // persistBatch persists a batch to the datastore with the given key | ||
| func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error { | ||
| pbBatch := &pb.Batch{ | ||
| Txs: batch.Transactions, | ||
| } | ||
|
|
@@ -195,7 +265,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc | |
| return err | ||
| } | ||
|
|
||
| // First write to DB for durability | ||
| // Write to DB | ||
| if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil { | ||
| return err | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.