-
Notifications
You must be signed in to change notification settings - Fork 260
feat: Implement ZSTD compression for DA blobs #3104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d5b41c8
cc68320
58e9b18
c875c0b
c76c245
dcc03d0
c52dde5
7ab8ce6
daff28b
db89458
2a37443
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ import ( | |
| "github.com/rs/zerolog" | ||
|
|
||
| "github.com/evstack/ev-node/block/internal/common" | ||
| "github.com/evstack/ev-node/pkg/da" | ||
| blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" | ||
| datypes "github.com/evstack/ev-node/pkg/da/types" | ||
| ) | ||
|
|
@@ -87,17 +88,44 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace | |
| } | ||
| } | ||
|
|
||
| // Select compression level based on backlog pressure: | ||
| // large batch = high backlog = prioritize speed; | ||
| // small batch = low backlog = prioritize ratio. | ||
| compLevel := da.LevelBest | ||
| switch { | ||
| case len(data) > 10: | ||
| compLevel = da.LevelFastest | ||
| case len(data) > 3: | ||
| compLevel = da.LevelDefault | ||
| } | ||
|
|
||
| blobs := make([]*blobrpc.Blob, len(data)) | ||
| for i, raw := range data { | ||
| if uint64(len(raw)) > common.DefaultMaxBlobSize { | ||
| compressed, compErr := da.Compress(raw, compLevel) | ||
| if compErr != nil { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusError, | ||
| Message: fmt.Sprintf("compress blob %d: %v", i, compErr), | ||
| }, | ||
| } | ||
| } | ||
| c.logger.Debug(). | ||
| Int("original_size", len(raw)). | ||
| Int("compressed_size", len(compressed)). | ||
| Float64("ratio", float64(len(compressed))/float64(max(len(raw), 1))). | ||
| Int("level", int(compLevel)). | ||
| Msg("compressed blob for DA submission") | ||
|
|
||
| if uint64(len(compressed)) > common.DefaultMaxBlobSize { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusTooBig, | ||
| Message: datypes.ErrBlobSizeOverLimit.Error(), | ||
| }, | ||
| } | ||
| } | ||
| blobs[i], err = blobrpc.NewBlobV0(ns, raw) | ||
| blobs[i], err = blobrpc.NewBlobV0(ns, compressed) | ||
| if err != nil { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
|
|
@@ -278,12 +306,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) | |
| } | ||
| } | ||
|
|
||
| // Extract IDs and data from the blobs. | ||
| ids := make([]datypes.ID, len(blobs)) | ||
| data := make([]datypes.Blob, len(blobs)) | ||
| // Extract IDs and data from the blobs, decompressing if needed. | ||
| // Malicious or corrupt blobs that fail decompression are logged and skipped. | ||
| ids := make([]datypes.ID, 0, len(blobs)) | ||
| data := make([]datypes.Blob, 0, len(blobs)) | ||
| for i, b := range blobs { | ||
| ids[i] = blobrpc.MakeID(height, b.Commitment) | ||
| data[i] = b.Data() | ||
| decompressed, decompErr := da.Decompress(ctx, b.Data()) | ||
| if decompErr != nil { | ||
| c.logger.Warn(). | ||
| Err(decompErr). | ||
| Uint64("height", height). | ||
| Int("blob_index", i). | ||
| Int("blob_size", len(b.Data())). | ||
| Msg("skipping malicious or corrupt DA blob") | ||
| continue | ||
| } | ||
|
Comment on lines
+314
to
+323
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not return success when decompression dropped all requested data.
💡 Proposed fix ids := make([]datypes.ID, 0, len(blobs))
data := make([]datypes.Blob, 0, len(blobs))
+ failedDecompress := 0
for i, b := range blobs {
decompressed, decompErr := da.Decompress(ctx, b.Data())
if decompErr != nil {
+ failedDecompress++
c.logger.Warn().
Err(decompErr).
Uint64("height", height).
Int("blob_index", i).
Int("blob_size", len(b.Data())).
Msg("skipping malicious or corrupt DA blob")
continue
}
ids = append(ids, blobrpc.MakeID(height, b.Commitment))
data = append(data, decompressed)
}
+ if len(data) == 0 && len(blobs) > 0 {
+ return datypes.ResultRetrieve{
+ BaseResult: datypes.BaseResult{
+ Code: datypes.StatusError,
+ Message: fmt.Sprintf("all blobs failed decompression (%d/%d)", failedDecompress, len(blobs)),
+ Height: height,
+ Timestamp: blockTime,
+ },
+ }
+ }Apply the same principle in As per coding guidelines, "Return errors early". Also applies to: 330-338, 403-413 🤖 Prompt for AI Agents |
||
| ids = append(ids, blobrpc.MakeID(height, b.Commitment)) | ||
| data = append(data, decompressed) | ||
| } | ||
|
|
||
| c.logger.Debug().Int("num_blobs", len(blobs)).Msg("retrieved blobs") | ||
|
|
@@ -361,7 +400,16 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ | |
| if b == nil { | ||
| continue | ||
| } | ||
| res = append(res, b.Data()) | ||
| decompressed, decompErr := da.Decompress(ctx, b.Data()) | ||
| if decompErr != nil { | ||
|
alpe marked this conversation as resolved.
|
||
| c.logger.Warn(). | ||
| Err(decompErr). | ||
| Uint64("height", height). | ||
| Int("blob_size", len(b.Data())). | ||
| Msg("skipping malicious or corrupt DA blob") | ||
| continue | ||
| } | ||
| res = append(res, decompressed) | ||
| } | ||
|
|
||
| return res, nil | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.