Skip to content

Commit 3090e41

Browse files
committed
refactor: create da client and split cache interface
1 parent a465969 commit 3090e41

28 files changed

+1134
-816
lines changed

.mockery.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ packages:
4848
filename: external/hstore.go
4949
github.com/evstack/ev-node/block/internal/syncing:
5050
interfaces:
51-
daRetriever:
51+
DARetriever:
5252
config:
5353
dir: ./block/internal/syncing
5454
pkgname: syncing
55-
filename: syncer_mock.go
55+
filename: da_retriever_mock.go
5656
p2pHandler:
5757
config:
5858
dir: ./block/internal/syncing

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
### Changed
1313

1414
- Rename `evm-single` to `evm` and `grpc-single` to `evgrpc` for clarity. [#2839](https://github.com/evstack/ev-node/pull/2839)
15+
- Split cache interface in `CacheManager` and `PendingManager` and create `da` client to easy DA handling.
1516

1617
## v1.0.0-beta.10
1718

block/components.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,15 @@ func NewSyncComponents(
144144
return nil, fmt.Errorf("failed to create cache manager: %w", err)
145145
}
146146

147+
daClient := NewDAClient(da, config, logger)
148+
147149
// error channel for critical failures
148150
errorCh := make(chan error, 1)
149151

150152
syncer := syncing.NewSyncer(
151153
store,
152154
exec,
153-
da,
155+
daClient,
154156
cacheManager,
155157
metrics,
156158
config,
@@ -162,8 +164,8 @@ func NewSyncComponents(
162164
errorCh,
163165
)
164166

165-
// Create DA submitter for sync nodes (no signer, only DA inclusion processing)
166-
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
167+
// Create submitter for sync nodes (no signer, only DA inclusion processing)
168+
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
167169
submitter := submitting.NewSubmitter(
168170
store,
169171
exec,
@@ -243,8 +245,9 @@ func NewAggregatorComponents(
243245
return nil, fmt.Errorf("failed to create reaper: %w", err)
244246
}
245247

246-
// Create DA submitter for aggregator nodes (with signer for submission)
247-
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
248+
// Create DA client and submitter for aggregator nodes (with signer for submission)
249+
daClient := NewDAClient(da, config, logger)
250+
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
248251
submitter := submitting.NewSubmitter(
249252
store,
250253
exec,

block/internal/cache/manager.go

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ func registerGobTypes() {
4141
})
4242
}
4343

44-
// Manager provides centralized cache management for both executing and syncing components
45-
type Manager interface {
44+
// CacheManager provides centralized cache management for both executing and syncing components
45+
type CacheManager interface {
4646
// Header operations
4747
IsHeaderSeen(hash string) bool
4848
SetHeaderSeen(hash string, blockHeight uint64)
@@ -62,14 +62,6 @@ type Manager interface {
6262
SetTxSeen(hash string)
6363
CleanupOldTxs(olderThan time.Duration) int
6464

65-
// Pending operations
66-
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
67-
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
68-
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
69-
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
70-
NumPendingHeaders() uint64
71-
NumPendingData() uint64
72-
7365
// Pending events syncing coordination
7466
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
7567
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
@@ -83,6 +75,22 @@ type Manager interface {
8375
DeleteHeight(blockHeight uint64)
8476
}
8577

78+
// PendingManager provides operations for managing pending headers and data
79+
type PendingManager interface {
80+
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
81+
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
82+
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
83+
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
84+
NumPendingHeaders() uint64
85+
NumPendingData() uint64
86+
}
87+
88+
// Manager provides centralized cache management for both executing and syncing components
89+
type Manager interface {
90+
CacheManager
91+
PendingManager
92+
}
93+
8694
var _ Manager = (*implementation)(nil)
8795

8896
// implementation provides the concrete implementation of cache Manager
@@ -98,6 +106,59 @@ type implementation struct {
98106
logger zerolog.Logger
99107
}
100108

109+
// NewPendingManager creates a new pending manager instance
110+
func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager, error) {
111+
pendingHeaders, err := NewPendingHeaders(store, logger)
112+
if err != nil {
113+
return nil, fmt.Errorf("failed to create pending headers: %w", err)
114+
}
115+
116+
pendingData, err := NewPendingData(store, logger)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to create pending data: %w", err)
119+
}
120+
121+
return &implementation{
122+
pendingHeaders: pendingHeaders,
123+
pendingData: pendingData,
124+
logger: logger,
125+
}, nil
126+
}
127+
128+
// NewCacheManager creates a new cache manager instance
129+
func NewCacheManager(cfg config.Config, logger zerolog.Logger) (CacheManager, error) {
130+
// Initialize caches
131+
headerCache := NewCache[types.SignedHeader]()
132+
dataCache := NewCache[types.Data]()
133+
txCache := NewCache[struct{}]()
134+
pendingEventsCache := NewCache[common.DAHeightEvent]()
135+
136+
registerGobTypes()
137+
impl := &implementation{
138+
headerCache: headerCache,
139+
dataCache: dataCache,
140+
txCache: txCache,
141+
txTimestamps: new(sync.Map),
142+
pendingEventsCache: pendingEventsCache,
143+
config: cfg,
144+
logger: logger,
145+
}
146+
147+
if cfg.ClearCache {
148+
// Clear the cache from disk
149+
if err := impl.ClearFromDisk(); err != nil {
150+
logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache")
151+
}
152+
} else {
153+
// Load existing cache from disk
154+
if err := impl.LoadFromDisk(); err != nil {
155+
logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache")
156+
}
157+
}
158+
159+
return impl, nil
160+
}
161+
101162
// NewManager creates a new cache manager instance
102163
func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Manager, error) {
103164
// Initialize caches

0 commit comments

Comments
 (0)