diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go new file mode 100644 index 0000000000..39988c67e5 --- /dev/null +++ b/block/internal/da/fiber_client.go @@ -0,0 +1,444 @@ +package da + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da/fibremock" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +const ( + fiberIndexRetainHeights = 4096 + fiberSubscribePollFreq = 1 * time.Second + fiberSubscribeChanSize = 16 +) + +type ( + FiberClient = fibremock.DA + BlobID = fibremock.BlobID + UploadResult = fibremock.UploadResult + BlobEvent = fibremock.BlobEvent +) + +type FiberConfig struct { + Client FiberClient + Logger zerolog.Logger + DefaultTimeout time.Duration + Namespace string + DataNamespace string + ForcedInclusionNamespace string +} + +type fiberDAClient struct { + fiber FiberClient + logger zerolog.Logger + defaultTimeout time.Duration + namespaceBz []byte + dataNamespaceBz []byte + forcedNamespaceBz []byte + hasForcedNamespace bool + + mu sync.RWMutex + index map[uint64][]fiberIndexedBlob + indexTail uint64 + indexWindow uint64 + latestHeight uint64 +} + +type fiberIndexedBlob struct { + id datypes.ID + namespace []byte + data []byte + blobID []byte +} + +var _ FullClient = (*fiberDAClient)(nil) + +func NewFiberClient(cfg FiberConfig) FullClient { + if cfg.Client == nil { + return nil + } + if cfg.DefaultTimeout == 0 { + cfg.DefaultTimeout = 60 * time.Second + } + + hasForced := cfg.ForcedInclusionNamespace != "" + var forcedBz []byte + if hasForced { + forcedBz = datypes.NamespaceFromString(cfg.ForcedInclusionNamespace).Bytes() + } + + return &fiberDAClient{ + fiber: cfg.Client, + logger: cfg.Logger.With().Str("component", "fiber_da_client").Logger(), + defaultTimeout: cfg.DefaultTimeout, + namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(), + dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), + forcedNamespaceBz: forcedBz, + hasForcedNamespace: hasForced, + index: make(map[uint64][]fiberIndexedBlob), + indexWindow: fiberIndexRetainHeights, + } +} + +func makeFiberID(height uint64, blobID []byte) datypes.ID { + id := make([]byte, 8+len(blobID)) + binary.LittleEndian.PutUint64(id, height) + copy(id[8:], blobID) + return id +} + +func splitFiberID(id datypes.ID) (uint64, []byte) { + if len(id) <= 8 { + return 0, nil + } + return binary.LittleEndian.Uint64(id[:8]), id[8:] +} + +func (c *fiberDAClient) pruneIndexLocked() { + if c.indexWindow == 0 || c.indexTail == 0 || c.indexTail < c.indexWindow { + return + } + cutoff := c.indexTail - c.indexWindow + 1 + for h := range c.index { + if h < cutoff { + delete(c.index, h) + } + } +} + +func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, namespace []byte, _ []byte) datypes.ResultSubmit { + var blobSize uint64 + for _, b := range data { + blobSize += uint64(len(b)) + } + + uploaded := make([]fiberUploadResult, 0, len(data)) + + for i, raw := range data { + if uint64(len(raw)) > common.DefaultMaxBlobSize { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusTooBig, + Message: fmt.Sprintf("blob %d exceeds max size (%d > %d)", i, len(raw), common.DefaultMaxBlobSize), + }, + } + } + + uploadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + result, err := c.fiber.Upload(uploadCtx, namespace, raw) + cancel() + if err != nil { + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + + c.logger.Error().Err(err).Int("blob_index", i).Msg("fiber upload failed") + + if len(uploaded) > 0 { + c.indexUploaded(uploaded, namespace) + } + + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob %d: %v", i, err), + SubmittedCount: uint64(len(uploaded)), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + c.mu.Lock() + c.latestHeight++ + h := c.latestHeight + c.mu.Unlock() + + uploaded = append(uploaded, fiberUploadResult{ + blobID: result.BlobID, + height: h, + data: raw, + }) + } + + if len(uploaded) == 0 { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + ids := c.indexUploaded(uploaded, namespace) + + c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", uploaded[len(uploaded)-1].height).Msg("fiber DA submission successful") + + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: uploaded[len(uploaded)-1].height, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } +} + +type fiberUploadResult struct { + blobID []byte + height uint64 + data []byte +} + +func (c *fiberDAClient) indexUploaded(uploaded []fiberUploadResult, namespace []byte) []datypes.ID { + submitHeight := uploaded[len(uploaded)-1].height + ids := make([]datypes.ID, len(uploaded)) + + c.mu.Lock() + for i, u := range uploaded { + id := makeFiberID(submitHeight, u.blobID) + ids[i] = id + c.index[submitHeight] = append(c.index[submitHeight], fiberIndexedBlob{ + id: id, + namespace: namespace, + data: u.data, + blobID: u.blobID, + }) + } + if submitHeight > c.indexTail { + c.indexTail = submitHeight + } + c.pruneIndexLocked() + c.mu.Unlock() + + return ids +} + +func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, true) +} + +func (c *fiberDAClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, false) +} + +func (c *fiberDAClient) retrieve(_ context.Context, height uint64, namespace []byte, _ bool) datypes.ResultRetrieve { + c.mu.RLock() + allBlobs, ok := c.index[height] + c.mu.RUnlock() + + if !ok || len(allBlobs) == 0 { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusNotFound, + Message: "no blobs found at height in fiber index", + Height: height, + Timestamp: time.Now(), + }, + } + } + + var matching []fiberIndexedBlob + for _, b := range allBlobs { + if nsEqual(b.namespace, namespace) { + matching = append(matching, b) + } + } + + if len(matching) == 0 { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusNotFound, + Message: "no blobs found at height for given namespace in fiber index", + Height: height, + Timestamp: time.Now(), + }, + } + } + + ids := make([]datypes.ID, len(matching)) + data := make([]datypes.Blob, len(matching)) + for i, b := range matching { + ids[i] = b.id + data[i] = b.data + } + + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Height: height, + IDs: ids, + Timestamp: time.Now(), + }, + Data: data, + } +} + +func nsEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + if len(ids) == 0 { + return nil, nil + } + + res := make([]datypes.Blob, 0, len(ids)) + for _, id := range ids { + _, blobID := splitFiberID(id) + if blobID == nil { + return nil, fmt.Errorf("invalid fiber blob id: %x", id) + } + + downloadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + data, err := c.fiber.Download(downloadCtx, blobID) + cancel() + if err != nil { + return nil, fmt.Errorf("fiber download failed for blob %x: %w", blobID, err) + } + res = append(res, data) + } + + return res, nil +} + +func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan datypes.SubscriptionEvent, error) { + out := make(chan datypes.SubscriptionEvent, fiberSubscribeChanSize) + + go func() { + defer close(out) + + ticker := time.NewTicker(fiberSubscribePollFreq) + defer ticker.Stop() + + var lastHeight uint64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.mu.RLock() + height := c.latestHeight + c.mu.RUnlock() + + if height <= lastHeight { + continue + } + + for h := lastHeight + 1; h <= height; h++ { + c.mu.RLock() + blobs, ok := c.index[h] + c.mu.RUnlock() + if !ok || len(blobs) == 0 { + continue + } + + blobData := make([][]byte, len(blobs)) + for i, b := range blobs { + blobData[i] = b.data + } + + select { + case out <- datypes.SubscriptionEvent{ + Height: h, + Timestamp: time.Now(), + Blobs: blobData, + }: + case <-ctx.Done(): + return + } + } + lastHeight = height + } + } + }() + + return out, nil +} + +func (c *fiberDAClient) GetLatestDAHeight(context.Context) (uint64, error) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.latestHeight, nil +} + +func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) ([]datypes.Proof, error) { + if len(ids) == 0 { + return []datypes.Proof{}, nil + } + + proofs := make([]datypes.Proof, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + proofs[i] = b.blobID + break + } + } + } + + return proofs, nil +} + +func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []datypes.Proof, _ []byte) ([]bool, error) { + if len(ids) != len(proofs) { + return nil, errors.New("number of IDs and proofs must match") + } + if len(ids) == 0 { + return []bool{}, nil + } + + results := make([]bool, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.blobID) + break + } + } + } + + return results, nil +} + +func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } +func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } +func (c *fiberDAClient) GetForcedInclusionNamespace() []byte { return c.forcedNamespaceBz } +func (c *fiberDAClient) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } diff --git a/block/internal/da/fiber_client_test.go b/block/internal/da/fiber_client_test.go new file mode 100644 index 0000000000..a4c9843cf3 --- /dev/null +++ b/block/internal/da/fiber_client_test.go @@ -0,0 +1,604 @@ +package da + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block/internal/da/fibremock" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +func makeTestFiberClient(t *testing.T) (*fibremock.MockDA, FullClient) { + t.Helper() + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + require.NotNil(t, cl) + return mock, cl +} + +func TestFiberClient_NewClient_Nil(t *testing.T) { + cl := NewFiberClient(FiberConfig{Client: nil}) + require.Nil(t, cl) +} + +func TestFiberClient_Submit_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("hello"), []byte("world")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 2) + require.Equal(t, uint64(2), res.SubmittedCount) + require.Greater(t, res.Height, uint64(0)) + require.Equal(t, uint64(10), res.BlobSize) +} + +func TestFiberClient_Submit_SingleBlob(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("single")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 1) + require.Equal(t, uint64(6), res.BlobSize) +} + +func TestFiberClient_Submit_EmptyData(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Empty(t, res.IDs) + require.Equal(t, uint64(0), res.SubmittedCount) +} + +func TestFiberClient_Submit_UploadError(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: &faultInjector{FiberClient: mock, err: errors.New("upload failed")}, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusError, res.Code) + require.Contains(t, res.Message, "fiber upload failed") +} + +func TestFiberClient_Submit_CanceledContext(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: &faultInjector{FiberClient: mock, err: context.Canceled}, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextCanceled, res.Code) +} + +func TestFiberClient_Submit_DeadlineExceeded(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: &faultInjector{FiberClient: mock, err: context.DeadlineExceeded}, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextDeadline, res.Code) +} + +func TestFiberClient_Submit_BlobTooLarge(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + largeBlob := make([]byte, 6*1024*1024) + res := cl.Submit(context.Background(), [][]byte{largeBlob}, 0, ns, nil) + + require.Equal(t, datypes.StatusTooBig, res.Code) +} + +func TestFiberClient_Submit_PartialFailureIndexesUploaded(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + fault := &faultInjector{FiberClient: mock} + cl := NewFiberClient(FiberConfig{ + Client: fault, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + res1 := cl.Submit(context.Background(), [][]byte{[]byte("first")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res1.Code) + + fault.SetError(errors.New("transient failure")) + res2 := cl.Submit(context.Background(), [][]byte{[]byte("second")}, 0, ns, nil) + require.Equal(t, datypes.StatusError, res2.Code) + + fault.SetError(nil) + res3 := cl.Submit(context.Background(), [][]byte{[]byte("third")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res3.Code) + + retrieveRes := cl.Retrieve(context.Background(), res1.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 1) + require.Equal(t, []byte("first"), retrieveRes.Data[0]) + + retrieveRes3 := cl.Retrieve(context.Background(), res3.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes3.Code) + require.Equal(t, []byte("third"), retrieveRes3.Data[0]) +} + +func TestFiberClient_Submit_PartialFailureOnSecondBlob(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + failing := &failOnNthUpload{FiberClient: mock, failAt: 2, err: errors.New("second blob fails")} + cl := NewFiberClient(FiberConfig{ + Client: failing, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + res := cl.Submit(context.Background(), [][]byte{[]byte("first"), []byte("second"), []byte("third")}, 0, ns, nil) + require.Equal(t, datypes.StatusError, res.Code) + require.Contains(t, res.Message, "blob 1") + require.Equal(t, uint64(1), res.SubmittedCount) + + fc := cl.(*fiberDAClient) + fc.mu.RLock() + totalBlobs := 0 + for _, blobs := range fc.index { + totalBlobs += len(blobs) + } + fc.mu.RUnlock() + require.Equal(t, 1, totalBlobs) +} + +func TestFiberClient_Retrieve_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("hello")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.Retrieve(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 1) + require.Equal(t, []byte("hello"), retrieveRes.Data[0]) + require.Equal(t, submitRes.IDs, retrieveRes.IDs) +} + +func TestFiberClient_RetrieveBlobs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("blob1"), []byte("blob2")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.RetrieveBlobs(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 2) + require.Equal(t, []byte("blob1"), retrieveRes.Data[0]) + require.Equal(t, []byte("blob2"), retrieveRes.Data[1]) +} + +func TestFiberClient_Retrieve_NotFound(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + retrieveRes := cl.Retrieve(context.Background(), 9999, ns) + require.Equal(t, datypes.StatusNotFound, retrieveRes.Code) +} + +func TestFiberClient_Retrieve_NamespaceFiltering(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns1 := datypes.NamespaceFromString("ns-a").Bytes() + ns2 := datypes.NamespaceFromString("ns-b").Bytes() + + res1 := cl.Submit(context.Background(), [][]byte{[]byte("alpha")}, 0, ns1, nil) + require.Equal(t, datypes.StatusSuccess, res1.Code) + + res2 := cl.Submit(context.Background(), [][]byte{[]byte("beta")}, 0, ns2, nil) + require.Equal(t, datypes.StatusSuccess, res2.Code) + + rr1 := cl.Retrieve(context.Background(), res1.Height, ns1) + require.Equal(t, datypes.StatusSuccess, rr1.Code) + require.Equal(t, []byte("alpha"), rr1.Data[0]) + + rr2 := cl.Retrieve(context.Background(), res1.Height, ns2) + require.Equal(t, datypes.StatusNotFound, rr2.Code) + + rr3 := cl.Retrieve(context.Background(), res2.Height, ns2) + require.Equal(t, datypes.StatusSuccess, rr3.Code) + require.Equal(t, []byte("beta"), rr3.Data[0]) +} + +func TestFiberClient_Get_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("getme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, []byte("getme"), blobs[0]) +} + +func TestFiberClient_Get_EmptyIDs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + blobs, err := cl.Get(context.Background(), nil, nil) + require.NoError(t, err) + require.Nil(t, blobs) +} + +func TestFiberClient_Get_InvalidID(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Get(context.Background(), []datypes.ID{[]byte{0x01}}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid fiber blob id") +} + +func TestFiberClient_Get_DownloadError(t *testing.T) { + _, cl := makeTestFiberClient(t) + + fakeBlobID := make([]byte, 33) + id := makeFiberID(1, fakeBlobID) + + _, err := cl.Get(context.Background(), []datypes.ID{id}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "fiber download failed") +} + +func TestFiberClient_GetLatestDAHeight(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res.Code) + + height, err := cl.GetLatestDAHeight(context.Background()) + require.NoError(t, err) + require.Equal(t, res.Height, height) +} + +func TestFiberClient_GetProofs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("prooftest")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, proofs, 1) + require.NotEmpty(t, proofs[0]) +} + +func TestFiberClient_GetProofs_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + proofs, err := cl.GetProofs(context.Background(), nil, nil) + require.NoError(t, err) + require.Empty(t, proofs) +} + +func TestFiberClient_Validate_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validateme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + + results, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.True(t, results[0]) +} + +func TestFiberClient_Validate_MismatchedLengths(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Validate(context.Background(), make([]datypes.ID, 3), make([]datypes.Proof, 2), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "must match") +} + +func TestFiberClient_Validate_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + results, err := cl.Validate(context.Background(), nil, nil, nil) + require.NoError(t, err) + require.Empty(t, results) +} + +func TestFiberClient_Validate_WrongProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validatewrong")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + fakeProofs := []datypes.Proof{[]byte("wrong-proof")} + results, err := cl.Validate(context.Background(), submitRes.IDs, fakeProofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.False(t, results[0]) +} + +func TestFiberClient_Validate_EmptyProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + emptyProofs := []datypes.Proof{[]byte{}} + results, err := cl.Validate(context.Background(), submitRes.IDs, emptyProofs, ns) + require.NoError(t, err) + require.False(t, results[0]) +} + +func TestFiberClient_Validate_UnknownID(t *testing.T) { + _, cl := makeTestFiberClient(t) + + fakeID := makeFiberID(99999, make([]byte, 33)) + proofs := []datypes.Proof{[]byte("some-proof")} + results, err := cl.Validate(context.Background(), []datypes.ID{fakeID}, proofs, nil) + require.NoError(t, err) + require.Len(t, results, 1) + require.False(t, results[0]) +} + +func TestFiberClient_Namespaces(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + ForcedInclusionNamespace: "forced-ns", + }) + require.NotNil(t, cl) + + require.Equal(t, datypes.NamespaceFromString("header-ns").Bytes(), cl.GetHeaderNamespace()) + require.Equal(t, datypes.NamespaceFromString("data-ns").Bytes(), cl.GetDataNamespace()) + require.Equal(t, datypes.NamespaceFromString("forced-ns").Bytes(), cl.GetForcedInclusionNamespace()) + require.True(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_NoForcedNamespace(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + }) + require.NotNil(t, cl) + + require.Nil(t, cl.GetForcedInclusionNamespace()) + require.False(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_Subscribe(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := cl.Subscribe(ctx, nil, false) + require.NoError(t, err) + require.NotNil(t, ch) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("sub-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + select { + case ev := <-ch: + require.Equal(t, submitRes.Height, ev.Height) + require.Len(t, ev.Blobs, 1) + require.Equal(t, []byte("sub-data"), ev.Blobs[0]) + case <-time.After(5 * time.Second): + t.Fatal("subscribe did not emit event within timeout") + } +} + +func TestFiberClient_Submit_MultipleBlobs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := [][]byte{[]byte("first"), []byte("second"), []byte("third")} + res := cl.Submit(context.Background(), data, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 3) + require.Equal(t, uint64(3), res.SubmittedCount) + + retrieveRes := cl.Retrieve(context.Background(), res.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 3) + require.Equal(t, []byte("first"), retrieveRes.Data[0]) + require.Equal(t, []byte("second"), retrieveRes.Data[1]) + require.Equal(t, []byte("third"), retrieveRes.Data[2]) +} + +func TestFiberClient_SubmitAndDownload(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := []byte("download-test") + submitRes := cl.Submit(context.Background(), [][]byte{data}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, data, blobs[0]) +} + +func TestMakeFiberID_RoundTrip(t *testing.T) { + blobID := make([]byte, 33) + blobID[0] = 1 + for i := 1; i < 33; i++ { + blobID[i] = byte(i) + } + + id := makeFiberID(42, blobID) + height, extractedBlobID := splitFiberID(id) + + require.Equal(t, uint64(42), height) + require.Equal(t, blobID, extractedBlobID) +} + +func TestSplitFiberID_Invalid(t *testing.T) { + height, blobID := splitFiberID([]byte{0x01, 0x02}) + require.Equal(t, uint64(0), height) + require.Nil(t, blobID) +} + +func TestFiberClient_DefaultTimeout(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + require.NotNil(t, cl) + fc := cl.(*fiberDAClient) + require.Equal(t, 60*time.Second, fc.defaultTimeout) +} + +func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("cycle-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + submittedHeight := submitRes.Height + + retrieveRes := cl.Retrieve(context.Background(), submittedHeight, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Equal(t, []byte("cycle-data"), retrieveRes.Data[0]) + + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Equal(t, []byte("cycle-data"), blobs[0]) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.NotEmpty(t, proofs[0]) + + valid, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.True(t, valid[0]) +} + +func TestFiberClient_IndexPruning(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + require.NotNil(t, cl) + fc := cl.(*fiberDAClient) + fc.indexWindow = 10 + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + var lastHeight uint64 + for i := 0; i < 20; i++ { + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res.Code) + lastHeight = res.Height + } + + fc.mu.RLock() + indexLen := len(fc.index) + _, hasOld := fc.index[lastHeight-20] + _, hasRecent := fc.index[lastHeight] + fc.mu.RUnlock() + + require.True(t, hasRecent, "most recent height should be in index") + require.False(t, hasOld, "old height should have been pruned") + require.LessOrEqual(t, indexLen, 10, "index should be bounded by window") +} + +type faultInjector struct { + FiberClient + err error +} + +func (f *faultInjector) SetError(err error) { f.err = err } + +func (f *faultInjector) Upload(ctx context.Context, namespace, data []byte) (fibremock.UploadResult, error) { + if f.err != nil { + return fibremock.UploadResult{}, f.err + } + return f.FiberClient.Upload(ctx, namespace, data) +} + +type failOnNthUpload struct { + FiberClient + failAt uint64 + err error + callCount atomic.Uint64 +} + +func (f *failOnNthUpload) Upload(ctx context.Context, namespace, data []byte) (fibremock.UploadResult, error) { + n := f.callCount.Add(1) + if n == f.failAt { + return fibremock.UploadResult{}, f.err + } + return f.FiberClient.Upload(ctx, namespace, data) +} diff --git a/block/internal/da/fibremock/fibre.go b/block/internal/da/fibremock/fibre.go new file mode 100644 index 0000000000..986c27cc3b --- /dev/null +++ b/block/internal/da/fibremock/fibre.go @@ -0,0 +1,79 @@ +// Package fibre provides a Go client interface and mock implementation for the +// Fibre DA (Data Availability) gRPC service. +// +// # Design Assumptions +// +// - The sequencer trusts the encoder to eventually confirm blob inclusion. +// Upload returns after the blob is uploaded and the PFF transaction is +// broadcast, NOT after on-chain confirmation. This keeps the sequencer's +// write path fast (~2s per 128 MB blob). +// +// - Callers are expected to batch/buffer their data into blobs sized for the +// protocol maximum (128 MiB - 5 byte header = 134,217,723 bytes). +// The interface accepts arbitrary sizes but the implementation may batch +// or reject oversized blobs. +// +// - Confirmation/finality is intentionally omitted from the initial API. +// The sequencer does not need it; the read path (Listen + Download) is +// sufficient for full nodes. A Status or Confirm RPC can be added later +// if needed without breaking existing callers. +// +// - Blob ordering is encoded in the blob data itself by the caller. +// The interface does not impose or guarantee ordering. +// +// - The interface is the same whether the encoder runs in-process or as an +// external gRPC service. For in-process use, call the mock or real +// implementation directly; for external use, connect via gRPC. +package fibremock + +import ( + "context" + "time" +) + +// BlobID uniquely identifies an uploaded blob (version byte + 32-byte commitment). +type BlobID []byte + +// UploadResult is returned by Upload after the blob is accepted. +type UploadResult struct { + // BlobID uniquely identifies the uploaded blob. + BlobID BlobID + // ExpiresAt is when the blob will be pruned from the DA network. + // Consumers must download before this time. + ExpiresAt time.Time +} + +// BlobEvent is delivered via Listen when a blob is confirmed on-chain. +type BlobEvent struct { + // BlobID of the confirmed blob. + BlobID BlobID + // Height is the chain height at which the blob was confirmed. + Height uint64 + // DataSize is the size of the original blob data in bytes (from the PFF). + // This allows full nodes to know the size before downloading. + DataSize uint64 +} + +// DA is the interface for interacting with the Fibre data availability layer. +// +// Implementations include: +// - MockDA: in-memory mock for testing +// - (future) gRPC client wrapping the Fibre service +// - (future) in-process encoder using fibre.Client directly +type DA interface { + // Upload submits a blob under the given namespace to the DA network. + // Returns after the blob is uploaded and the payment transaction is broadcast. + // Does NOT wait for on-chain confirmation (see package doc for rationale). + // + // The caller is responsible for batching data to the target blob size. + Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) + + // Download retrieves and reconstructs a blob by its ID. + // Returns the original data that was passed to Upload. + Download(ctx context.Context, blobID BlobID) ([]byte, error) + + // Listen streams confirmed blob events for the given namespace. + // The returned channel is closed when the context is cancelled. + // Each event includes the blob ID, confirmation height, and data size. + Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) +} diff --git a/block/internal/da/fibremock/mock.go b/block/internal/da/fibremock/mock.go new file mode 100644 index 0000000000..ee88552121 --- /dev/null +++ b/block/internal/da/fibremock/mock.go @@ -0,0 +1,246 @@ +package fibremock + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" +) + +var ( + // ErrBlobNotFound is returned when a blob ID is not in the store. + ErrBlobNotFound = errors.New("blob not found") + // ErrDataEmpty is returned when Upload is called with empty data. + ErrDataEmpty = errors.New("data cannot be empty") +) + +// MockDAConfig configures the mock DA implementation. +type MockDAConfig struct { + // MaxBlobs is the maximum number of blobs stored in memory. + // When exceeded, the oldest blob is evicted regardless of retention. + // 0 means no limit (use with caution — large blobs will OOM). + MaxBlobs int + // Retention is how long blobs are kept before automatic pruning. + // 0 means blobs are kept until evicted by MaxBlobs. + Retention time.Duration +} + +// DefaultMockDAConfig returns a config suitable for testing: +// 100 blobs max, 10 minute retention. +func DefaultMockDAConfig() MockDAConfig { + return MockDAConfig{ + MaxBlobs: 100, + Retention: 10 * time.Minute, + } +} + +// storedBlob holds a blob and its metadata in the mock store. +type storedBlob struct { + namespace []byte + data []byte + height uint64 + expiresAt time.Time + createdAt time.Time +} + +// subscriber tracks a Listen subscription. +type subscriber struct { + namespace []byte + ch chan BlobEvent +} + +// MockDA is an in-memory mock implementation of the DA interface. +// It stores blobs in memory with configurable retention and max blob count. +// Safe for concurrent use. +type MockDA struct { + cfg MockDAConfig + + mu sync.RWMutex + blobs map[string]*storedBlob // keyed by hex(blobID) + order []string // insertion order for LRU eviction + height uint64 + subscribers []subscriber +} + +// NewMockDA creates a new mock DA with the given config. +func NewMockDA(cfg MockDAConfig) *MockDA { + return &MockDA{ + cfg: cfg, + blobs: make(map[string]*storedBlob), + } +} + +// Upload stores the blob in memory and notifies listeners. +func (m *MockDA) Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) { + if len(data) == 0 { + return UploadResult{}, ErrDataEmpty + } + + blobID := mockBlobID(data) + key := fmt.Sprintf("%x", blobID) + now := time.Now() + + var expiresAt time.Time + if m.cfg.Retention > 0 { + expiresAt = now.Add(m.cfg.Retention) + } + + m.mu.Lock() + + // Evict oldest if at capacity + if m.cfg.MaxBlobs > 0 && len(m.blobs) >= m.cfg.MaxBlobs { + m.evictOldestLocked() + } + + // Prune expired blobs opportunistically + if m.cfg.Retention > 0 { + m.pruneExpiredLocked(now) + } + + m.height++ + height := m.height + + m.blobs[key] = &storedBlob{ + namespace: namespace, + data: data, + height: height, + expiresAt: expiresAt, + createdAt: now, + } + m.order = append(m.order, key) + + // Notify subscribers (non-blocking) + event := BlobEvent{ + BlobID: blobID, + Height: height, + DataSize: uint64(len(data)), + } + for i := range m.subscribers { + if namespaceMatch(m.subscribers[i].namespace, namespace) { + select { + case m.subscribers[i].ch <- event: + default: + // Channel full, drop event. Subscriber is too slow. + } + } + } + + m.mu.Unlock() + + return UploadResult{ + BlobID: blobID, + ExpiresAt: expiresAt, + }, nil +} + +// Download retrieves a blob by ID. +func (m *MockDA) Download(ctx context.Context, blobID BlobID) ([]byte, error) { + key := fmt.Sprintf("%x", blobID) + + m.mu.RLock() + blob, ok := m.blobs[key] + m.mu.RUnlock() + + if !ok { + return nil, ErrBlobNotFound + } + + if !blob.expiresAt.IsZero() && time.Now().After(blob.expiresAt) { + return nil, ErrBlobNotFound + } + + return blob.data, nil +} + +// Listen returns a channel that receives events when blobs matching the +// namespace are uploaded. The channel is closed when ctx is cancelled. +func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) { + ch := make(chan BlobEvent, 64) + + m.mu.Lock() + idx := len(m.subscribers) + m.subscribers = append(m.subscribers, subscriber{ + namespace: namespace, + ch: ch, + }) + m.mu.Unlock() + + // Clean up when context is done. + go func() { + <-ctx.Done() + m.mu.Lock() + // Remove subscriber by swapping with last + last := len(m.subscribers) - 1 + if idx <= last { + m.subscribers[idx] = m.subscribers[last] + } + m.subscribers = m.subscribers[:last] + m.mu.Unlock() + close(ch) + }() + + return ch, nil +} + +// BlobCount returns the number of blobs currently stored. +func (m *MockDA) BlobCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.blobs) +} + +// evictOldestLocked removes the oldest blob. Caller must hold m.mu. +func (m *MockDA) evictOldestLocked() { + if len(m.order) == 0 { + return + } + key := m.order[0] + m.order = m.order[1:] + delete(m.blobs, key) +} + +// pruneExpiredLocked removes blobs past their retention. Caller must hold m.mu. +func (m *MockDA) pruneExpiredLocked(now time.Time) { + surviving := m.order[:0] + for _, key := range m.order { + blob, ok := m.blobs[key] + if !ok { + continue + } + if !blob.expiresAt.IsZero() && now.After(blob.expiresAt) { + delete(m.blobs, key) + } else { + surviving = append(surviving, key) + } + } + m.order = surviving +} + +// namespaceMatch returns true if the subscription namespace matches the blob namespace. +// An empty subscription namespace matches all namespaces (wildcard). +func namespaceMatch(subNS, blobNS []byte) bool { + if len(subNS) == 0 { + return true + } + if len(subNS) != len(blobNS) { + return false + } + for i := range subNS { + if subNS[i] != blobNS[i] { + return false + } + } + return true +} + +// mockBlobID produces a deterministic blob ID from the data. +// Format: 1 byte version (0) + 32 bytes SHA256 hash. +func mockBlobID(data []byte) BlobID { + hash := sha256.Sum256(data) + id := make([]byte, 33) + id[0] = 0 // version byte + copy(id[1:], hash[:]) + return id +} diff --git a/block/internal/da/fibremock/mock_test.go b/block/internal/da/fibremock/mock_test.go new file mode 100644 index 0000000000..e5aa62aed0 --- /dev/null +++ b/block/internal/da/fibremock/mock_test.go @@ -0,0 +1,197 @@ +package fibremock + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestMockDA_UploadDownload(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + ns := []byte("test-ns") + data := []byte("hello fibre") + + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + if len(result.BlobID) != 33 { + t.Fatalf("expected 33-byte blob ID, got %d", len(result.BlobID)) + } + + got, err := m.Download(ctx, result.BlobID) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, data) { + t.Fatalf("data mismatch: got %q, want %q", got, data) + } +} + +func TestMockDA_UploadEmpty(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Upload(context.Background(), []byte("ns"), nil) + if err != ErrDataEmpty { + t.Fatalf("expected ErrDataEmpty, got %v", err) + } +} + +func TestMockDA_DownloadNotFound(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Download(context.Background(), BlobID{0, 1, 2}) + if err != ErrBlobNotFound { + t.Fatalf("expected ErrBlobNotFound, got %v", err) + } +} + +func TestMockDA_Listen(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ns := []byte("test-ns") + ch, err := m.Listen(ctx, ns) + if err != nil { + t.Fatal(err) + } + + // Upload a blob — should trigger the listener + data := []byte("listened blob") + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + + select { + case event := <-ch: + if !bytes.Equal(event.BlobID, result.BlobID) { + t.Fatal("blob ID mismatch in event") + } + if event.Height == 0 { + t.Fatal("expected non-zero height") + } + if event.DataSize != uint64(len(data)) { + t.Fatalf("expected data size %d, got %d", len(data), event.DataSize) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestMockDA_ListenNamespaceFilter(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := m.Listen(ctx, []byte("ns-A")) + if err != nil { + t.Fatal(err) + } + + // Upload to different namespace — should NOT trigger + m.Upload(ctx, []byte("ns-B"), []byte("wrong namespace")) + + select { + case <-ch: + t.Fatal("should not receive event for different namespace") + case <-time.After(50 * time.Millisecond): + // good + } +} + +func TestMockDA_ListenWildcard(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Empty namespace = wildcard + ch, err := m.Listen(ctx, nil) + if err != nil { + t.Fatal(err) + } + + m.Upload(ctx, []byte("any-ns"), []byte("wildcard test")) + + select { + case event := <-ch: + if event.Height == 0 { + t.Fatal("expected event") + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for wildcard event") + } +} + +func TestMockDA_MaxBlobsEviction(t *testing.T) { + m := NewMockDA(MockDAConfig{MaxBlobs: 3}) + ctx := context.Background() + + var ids []BlobID + for i := range 5 { + r, err := m.Upload(ctx, nil, []byte{byte(i), 1, 2, 3}) + if err != nil { + t.Fatal(err) + } + ids = append(ids, r.BlobID) + } + + // First two should be evicted + if _, err := m.Download(ctx, ids[0]); err != ErrBlobNotFound { + t.Fatal("expected first blob to be evicted") + } + if _, err := m.Download(ctx, ids[1]); err != ErrBlobNotFound { + t.Fatal("expected second blob to be evicted") + } + + // Last three should still be there + for i := 2; i < 5; i++ { + if _, err := m.Download(ctx, ids[i]); err != nil { + t.Fatalf("blob %d should exist: %v", i, err) + } + } + + if m.BlobCount() != 3 { + t.Fatalf("expected 3 blobs, got %d", m.BlobCount()) + } +} + +func TestMockDA_Retention(t *testing.T) { + m := NewMockDA(MockDAConfig{Retention: 50 * time.Millisecond}) + ctx := context.Background() + + r, err := m.Upload(ctx, nil, []byte("ephemeral")) + if err != nil { + t.Fatal(err) + } + + // Should exist immediately + if _, err := m.Download(ctx, r.BlobID); err != nil { + t.Fatal("blob should exist immediately") + } + + // Wait for expiry + time.Sleep(100 * time.Millisecond) + + if _, err := m.Download(ctx, r.BlobID); err != ErrBlobNotFound { + t.Fatal("blob should have expired") + } +} + +func TestMockDA_DeterministicBlobID(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + data := []byte("deterministic") + r1, _ := m.Upload(ctx, nil, data) + r2, _ := m.Upload(ctx, nil, data) + + if !bytes.Equal(r1.BlobID, r2.BlobID) { + t.Fatal("same data should produce same blob ID") + } +} + +// Verify MockDA satisfies the DA interface at compile time. +var _ DA = (*MockDA)(nil) diff --git a/block/public.go b/block/public.go index cc7691c299..d056394270 100644 --- a/block/public.go +++ b/block/public.go @@ -63,6 +63,28 @@ func NewDAClient( return base } +// NewFiberDAClient creates a new DA client backed by the Fiber protocol. +// The fiberClient parameter must implement the da.FiberClient interface. +// The returned client implements both DAClient and DAVerifier interfaces. +func NewFiberDAClient( + fiberClient da.FiberClient, + config config.Config, + logger zerolog.Logger, +) FullDAClient { + base := da.NewFiberClient(da.FiberConfig{ + Client: fiberClient, + Logger: logger, + DefaultTimeout: config.DA.RequestTimeout.Duration, + Namespace: config.DA.GetNamespace(), + DataNamespace: config.DA.GetDataNamespace(), + ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), + }) + if config.Instrumentation.IsTracingEnabled() { + return da.WithTracingClient(base) + } + return base +} + // Exported errors used by the sequencers var ( // ErrForceInclusionNotConfigured is returned when force inclusion is not configured. diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..02c1338279 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -92,6 +92,21 @@ const ( // FlagDAStartHeight is a flag for forcing the DA retrieval height to start from a specific height FlagDAStartHeight = FlagPrefixEvnode + "da.start_height" + // Fiber DA configuration flags + + // FlagDAFiberEnabled enables the Fiber DA client instead of the default JSON-RPC blob client + FlagDAFiberEnabled = FlagPrefixEvnode + "da.fiber.enabled" + // FlagDAFiberStateAddress is the gRPC address of the celestia-app node for Fiber state queries + FlagDAFiberStateAddress = FlagPrefixEvnode + "da.fiber.state_address" + // FlagDAFiberKeyringPath is the path to the keyring directory for Fiber payment promise signing + FlagDAFiberKeyringPath = FlagPrefixEvnode + "da.fiber.keyring_path" + // FlagDAFiberKeyName is the key name in the keyring to use for signing payment promises + FlagDAFiberKeyName = FlagPrefixEvnode + "da.fiber.key_name" + // FlagDAFiberUploadConcurrency limits concurrent Fiber uploads to validators + FlagDAFiberUploadConcurrency = FlagPrefixEvnode + "da.fiber.upload_concurrency" + // FlagDAFiberDownloadConcurrency limits concurrent Fiber downloads from validators + FlagDAFiberDownloadConcurrency = FlagPrefixEvnode + "da.fiber.download_concurrency" + // P2P configuration flags // FlagP2PListenAddress is a flag for specifying the P2P listen address @@ -258,6 +273,37 @@ type DAConfig struct { BatchSizeThreshold float64 `mapstructure:"batch_size_threshold" yaml:"batch_size_threshold" comment:"Minimum blob size threshold (as fraction of max blob size, 0.0-1.0) before submitting. Only applies to 'size' and 'adaptive' strategies. Example: 0.8 means wait until batch is 80% full. Default: 0.8."` BatchMaxDelay DurationWrapper `mapstructure:"batch_max_delay" yaml:"batch_max_delay" comment:"Maximum time to wait before submitting a batch regardless of size. Applies to 'time' and 'adaptive' strategies. Lower values reduce latency but may increase costs. Examples: \"6s\", \"12s\", \"30s\". Default: DA BlockTime."` BatchMinItems uint64 `mapstructure:"batch_min_items" yaml:"batch_min_items" comment:"Minimum number of items (headers or data) to accumulate before considering submission. Helps avoid submitting single items when more are expected soon. Default: 1."` + + // Fiber DA client configuration + Fiber FiberDAConfig `mapstructure:"fiber" yaml:"fiber"` +} + +// FiberDAConfig contains configuration for the Fiber DA client. +// When Enabled is true, the Fiber client is used instead of the default +// JSON-RPC blob client for DA operations. +type FiberDAConfig struct { + // Enabled switches the DA backend from the default JSON-RPC blob client + // to the Fiber protocol client. + Enabled bool `mapstructure:"enabled" yaml:"enabled" comment:"Enable the Fiber DA client for direct validator communication instead of the default JSON-RPC blob client"` + // StateAddress is the gRPC address of the celestia-app node used for + // state queries (validator set, chain ID, promise verification). + StateAddress string `mapstructure:"state_address" yaml:"state_address" comment:"gRPC address of the celestia-app node for Fiber state queries (host:port)"` + // KeyringPath is the directory path containing the keyring for signing + // Fiber payment promises. + KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing"` + // KeyName is the name of the key in the keyring to use for signing. + KeyName string `mapstructure:"key_name" yaml:"key_name" comment:"Name of the key in the keyring to use for signing Fiber payment promises"` + // UploadConcurrency limits the number of concurrent upload connections + // to validators. + UploadConcurrency int `mapstructure:"upload_concurrency" yaml:"upload_concurrency" comment:"Maximum number of concurrent upload connections to Fiber validators"` + // DownloadConcurrency limits the number of concurrent download connections + // from validators. + DownloadConcurrency int `mapstructure:"download_concurrency" yaml:"download_concurrency" comment:"Maximum number of concurrent download connections from Fiber validators"` +} + +// IsFiberEnabled returns true if the Fiber DA client is configured and enabled. +func (d *DAConfig) IsFiberEnabled() bool { + return d.Fiber.Enabled } // GetNamespace returns the namespace for header submissions. @@ -602,6 +648,14 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for disabled)") cmd.Flags().MarkHidden(FlagDAStartHeight) + // Fiber DA configuration flags + cmd.Flags().Bool(FlagDAFiberEnabled, def.DA.Fiber.Enabled, "enable the Fiber DA client for direct validator communication") + cmd.Flags().String(FlagDAFiberStateAddress, def.DA.Fiber.StateAddress, "gRPC address of the celestia-app node for Fiber state queries (host:port)") + cmd.Flags().String(FlagDAFiberKeyringPath, def.DA.Fiber.KeyringPath, "path to the keyring directory for Fiber payment promise signing") + cmd.Flags().String(FlagDAFiberKeyName, def.DA.Fiber.KeyName, "name of the key in the keyring for signing Fiber payment promises") + cmd.Flags().Int(FlagDAFiberUploadConcurrency, def.DA.Fiber.UploadConcurrency, "maximum concurrent uploads to Fiber validators") + cmd.Flags().Int(FlagDAFiberDownloadConcurrency, def.DA.Fiber.DownloadConcurrency, "maximum concurrent downloads from Fiber validators") + // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") cmd.Flags().String(FlagP2PPeers, def.P2P.Peers, "Comma separated list of seed nodes to connect to") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..9ad6447306 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -78,6 +78,18 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) assertFlagValue(t, flags, FlagDARequestTimeout, DefaultConfig().DA.RequestTimeout.Duration) + assertFlagValue(t, flags, FlagDABatchingStrategy, DefaultConfig().DA.BatchingStrategy) + assertFlagValue(t, flags, FlagDABatchSizeThreshold, DefaultConfig().DA.BatchSizeThreshold) + assertFlagValue(t, flags, FlagDABatchMaxDelay, DefaultConfig().DA.BatchMaxDelay.Duration) + assertFlagValue(t, flags, FlagDABatchMinItems, DefaultConfig().DA.BatchMinItems) + + // DA Fiber flags + assertFlagValue(t, flags, FlagDAFiberEnabled, DefaultConfig().DA.Fiber.Enabled) + assertFlagValue(t, flags, FlagDAFiberStateAddress, DefaultConfig().DA.Fiber.StateAddress) + assertFlagValue(t, flags, FlagDAFiberKeyringPath, DefaultConfig().DA.Fiber.KeyringPath) + assertFlagValue(t, flags, FlagDAFiberKeyName, DefaultConfig().DA.Fiber.KeyName) + assertFlagValue(t, flags, FlagDAFiberUploadConcurrency, DefaultConfig().DA.Fiber.UploadConcurrency) + assertFlagValue(t, flags, FlagDAFiberDownloadConcurrency, DefaultConfig().DA.Fiber.DownloadConcurrency) // P2P flags assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress) @@ -140,7 +152,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 84 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..2dd22b3e8a 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -83,6 +83,13 @@ func DefaultConfig() Config { BatchSizeThreshold: 0.8, BatchMaxDelay: DurationWrapper{0}, // 0 means use DA BlockTime BatchMinItems: 1, + Fiber: FiberDAConfig{ + Enabled: false, + StateAddress: "127.0.0.1:9090", + KeyName: "default-fibre", + UploadConcurrency: 100, + DownloadConcurrency: 34, + }, }, Instrumentation: DefaultInstrumentationConfig(), Log: LogConfig{ diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 0fcae9f31c..86fa08d45d 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -16,6 +16,12 @@ import ( var ErrInvalidID = errors.New("invalid chain id") +var ( + emptyBatch = &coresequencer.Batch{} + submitBatchResp = &coresequencer.SubmitBatchTxsResponse{} + verifyBatchOKResp = &coresequencer.VerifyBatchResponse{Status: true} +) + var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion @@ -55,14 +61,14 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su } if req.Batch == nil || len(req.Batch.Transactions) == 0 { - return &coresequencer.SubmitBatchTxsResponse{}, nil + return submitBatchResp, nil } s.mu.Lock() defer s.mu.Unlock() s.queue = append(s.queue, req.Batch.Transactions...) - return &coresequencer.SubmitBatchTxsResponse{}, nil + return submitBatchResp, nil } func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { @@ -77,7 +83,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(txs) == 0 { return &coresequencer.GetNextBatchResponse{ - Batch: &coresequencer.Batch{}, + Batch: emptyBatch, Timestamp: time.Now().UTC(), BatchData: req.LastBatchData, }, nil @@ -94,21 +100,22 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN filterStatuses, err := s.executor.FilterTxs(ctx, txs, req.MaxBytes, maxGas, false) if err != nil { s.logger.Warn().Err(err).Msg("failed to filter transactions, proceeding with unfiltered") - filterStatuses = make([]execution.FilterStatus, len(txs)) - for i := range filterStatuses { - filterStatuses[i] = execution.FilterOK - } + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{Transactions: txs}, + Timestamp: time.Now().UTC(), + BatchData: req.LastBatchData, + }, nil } - var validTxs [][]byte + write := 0 var postponedTxs [][]byte for i, status := range filterStatuses { switch status { case execution.FilterOK: - validTxs = append(validTxs, txs[i]) + txs[write] = txs[i] + write++ case execution.FilterPostpone: postponedTxs = append(postponedTxs, txs[i]) - case execution.FilterRemove: } } @@ -119,7 +126,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN } return &coresequencer.GetNextBatchResponse{ - Batch: &coresequencer.Batch{Transactions: validTxs}, + Batch: &coresequencer.Batch{Transactions: txs[:write]}, Timestamp: time.Now().UTC(), BatchData: req.LastBatchData, }, nil @@ -130,7 +137,7 @@ func (s *SoloSequencer) VerifyBatch(ctx context.Context, req coresequencer.Verif return nil, ErrInvalidID } - return &coresequencer.VerifyBatchResponse{Status: true}, nil + return verifyBatchOKResp, nil } func (s *SoloSequencer) SetDAHeight(height uint64) {