Skip to content

Commit 87a240b

Browse files
authored
[actpool] impl a store to cache actions on disk (iotexproject#4362)
1 parent 9624f47 commit 87a240b

File tree

3 files changed

+404
-0
lines changed

3 files changed

+404
-0
lines changed

actpool/blobstore.go

+270
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package actpool
2+
3+
import (
4+
"encoding/hex"
5+
"fmt"
6+
"os"
7+
"sync"
8+
9+
"github.com/ethereum/go-ethereum/params"
10+
"github.com/holiman/billy"
11+
"github.com/iotexproject/go-pkgs/hash"
12+
"github.com/pkg/errors"
13+
"go.uber.org/zap"
14+
15+
"github.com/iotexproject/iotex-core/action"
16+
"github.com/iotexproject/iotex-core/pkg/lifecycle"
17+
"github.com/iotexproject/iotex-core/pkg/log"
18+
)
19+
20+
const (
21+
// blobSize is the protocol constrained byte size of a single blob in a
22+
// transaction. There can be multiple of these embedded into a single tx.
23+
blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement
24+
25+
// maxBlobsPerTransaction is the maximum number of blobs a single transaction
26+
// is allowed to contain. Whilst the spec states it's unlimited, the block
27+
// data slots are protocol bound, which implicitly also limit this.
28+
maxBlobsPerTransaction = params.MaxBlobGasPerBlock / params.BlobTxBlobGasPerBlob
29+
30+
// txAvgSize is an approximate byte size of a transaction metadata to avoid
31+
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
32+
txAvgSize = 4 * 1024
33+
34+
// txMaxSize is the maximum size a single transaction can have, outside
35+
// the included blobs. Since blob transactions are pulled instead of pushed,
36+
// and only a small metadata is kept in ram, the rest is on disk, there is
37+
// no critical limit that should be enforced. Still, capping it to some sane
38+
// limit can never hurt.
39+
txMaxSize = 1024 * 1024
40+
)
41+
42+
type (
43+
blobStore struct {
44+
lifecycle.Readiness
45+
config blobStoreConfig // Configuration for the blob store
46+
47+
store billy.Database // Persistent data store for the tx
48+
stored uint64 // Useful data size of all transactions on disk
49+
50+
lookup map[hash.Hash256]uint64 // Lookup table mapping hashes to tx billy entries
51+
lock sync.RWMutex // Mutex protecting the store
52+
53+
encode encodeAction // Encoder for the tx
54+
decode decodeAction // Decoder for the tx
55+
}
56+
blobStoreConfig struct {
57+
Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs
58+
Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead)
59+
}
60+
61+
onAction func(selp *action.SealedEnvelope) error
62+
encodeAction func(selp *action.SealedEnvelope) ([]byte, error)
63+
decodeAction func([]byte) (*action.SealedEnvelope, error)
64+
)
65+
66+
var (
67+
errBlobNotFound = fmt.Errorf("blob not found")
68+
errStoreNotOpen = fmt.Errorf("blob store is not open")
69+
)
70+
71+
var defaultBlobStoreConfig = blobStoreConfig{
72+
Datadir: "blobpool",
73+
Datacap: 10 * 1024 * 1024 * 1024,
74+
}
75+
76+
func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) {
77+
if len(cfg.Datadir) == 0 {
78+
return nil, errors.New("datadir is empty")
79+
}
80+
return &blobStore{
81+
config: cfg,
82+
lookup: make(map[hash.Hash256]uint64),
83+
encode: encode,
84+
decode: decode,
85+
}, nil
86+
}
87+
88+
func (s *blobStore) Open(onData onAction) error {
89+
s.lock.Lock()
90+
defer s.lock.Unlock()
91+
92+
dir := s.config.Datadir
93+
if err := os.MkdirAll(dir, 0700); err != nil {
94+
return errors.Wrap(err, "failed to create blob store directory")
95+
}
96+
// Index all transactions on disk and delete anything inprocessable
97+
var fails []uint64
98+
index := func(id uint64, size uint32, blob []byte) {
99+
act, err := s.decode(blob)
100+
if err != nil {
101+
fails = append(fails, id)
102+
log.L().Warn("Failed to decode action", zap.Error(err))
103+
return
104+
}
105+
if err = onData(act); err != nil {
106+
fails = append(fails, id)
107+
log.L().Warn("Failed to process action", zap.Error(err))
108+
return
109+
}
110+
s.stored += uint64(size)
111+
h, _ := act.Hash()
112+
s.lookup[h] = id
113+
}
114+
store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index)
115+
if err != nil {
116+
return errors.Wrap(err, "failed to open blob store")
117+
}
118+
s.store = store
119+
120+
if len(fails) > 0 {
121+
log.L().Warn("Dropping invalidated blob transactions", zap.Int("count", len(fails)))
122+
123+
for _, id := range fails {
124+
if err := s.store.Delete(id); err != nil {
125+
s.Close()
126+
return errors.Wrap(err, "failed to delete blob from store")
127+
}
128+
}
129+
}
130+
131+
return s.TurnOn()
132+
}
133+
134+
func (s *blobStore) Close() error {
135+
s.lock.Lock()
136+
defer s.lock.Unlock()
137+
138+
if err := s.TurnOff(); err != nil {
139+
return err
140+
}
141+
return s.store.Close()
142+
}
143+
144+
func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) {
145+
if !s.IsReady() {
146+
return nil, errors.Wrap(errStoreNotOpen, "")
147+
}
148+
s.lock.RLock()
149+
defer s.lock.RUnlock()
150+
151+
id, ok := s.lookup[hash]
152+
if !ok {
153+
return nil, errors.Wrap(errBlobNotFound, "")
154+
}
155+
blob, err := s.store.Get(id)
156+
if err != nil {
157+
return nil, errors.Wrap(err, "failed to get blob from store")
158+
}
159+
return s.decode(blob)
160+
}
161+
162+
func (s *blobStore) Put(act *action.SealedEnvelope) error {
163+
if !s.IsReady() {
164+
return errors.Wrap(errStoreNotOpen, "")
165+
}
166+
s.lock.Lock()
167+
defer s.lock.Unlock()
168+
169+
h, _ := act.Hash()
170+
// if action is already stored, nothing to do
171+
if _, ok := s.lookup[h]; ok {
172+
return nil
173+
}
174+
// insert it into the database and update the indices
175+
blob, err := s.encode(act)
176+
if err != nil {
177+
return errors.Wrap(err, "failed to encode action")
178+
}
179+
id, err := s.store.Put(blob)
180+
if err != nil {
181+
return errors.Wrap(err, "failed to put blob into store")
182+
}
183+
s.stored += uint64(len(blob))
184+
s.lookup[h] = id
185+
// if the datacap is exceeded, remove old data
186+
if s.stored > s.config.Datacap {
187+
s.drop()
188+
}
189+
return nil
190+
}
191+
192+
func (s *blobStore) Delete(hash hash.Hash256) error {
193+
if !s.IsReady() {
194+
return errors.Wrap(errStoreNotOpen, "")
195+
}
196+
s.lock.Lock()
197+
defer s.lock.Unlock()
198+
199+
id, ok := s.lookup[hash]
200+
if !ok {
201+
return nil
202+
}
203+
if err := s.store.Delete(id); err != nil {
204+
return errors.Wrap(err, "failed to delete blob from store")
205+
}
206+
delete(s.lookup, hash)
207+
return nil
208+
}
209+
210+
// Range iterates over all stored with hashes
211+
func (s *blobStore) Range(fn func(hash.Hash256) bool) {
212+
if !s.IsReady() {
213+
return
214+
}
215+
s.lock.RLock()
216+
defer s.lock.RUnlock()
217+
218+
for h := range s.lookup {
219+
if !fn(h) {
220+
return
221+
}
222+
}
223+
}
224+
225+
func (s *blobStore) drop() {
226+
h, ok := s.evict()
227+
if !ok {
228+
log.L().Debug("no worst action found")
229+
return
230+
}
231+
id, ok := s.lookup[h]
232+
if !ok {
233+
log.L().Warn("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:])))
234+
return
235+
}
236+
if err := s.store.Delete(id); err != nil {
237+
log.L().Error("failed to delete worst action", zap.Error(err))
238+
}
239+
delete(s.lookup, h)
240+
return
241+
}
242+
243+
// TODO: implement a proper eviction policy
244+
func (s *blobStore) evict() (hash.Hash256, bool) {
245+
for h := range s.lookup {
246+
return h, true
247+
}
248+
return hash.ZeroHash256, false
249+
}
250+
251+
// newSlotter creates a helper method for the Billy datastore that returns the
252+
// individual shelf sizes used to store transactions in.
253+
//
254+
// The slotter will create shelves for each possible blob count + some tx metadata
255+
// wiggle room, up to the max permitted limits.
256+
//
257+
// The slotter also creates a shelf for 0-blob transactions. Whilst those are not
258+
// allowed in the current protocol, having an empty shelf is not a relevant use
259+
// of resources, but it makes stress testing with junk transactions simpler.
260+
func newSlotter() func() (uint32, bool) {
261+
slotsize := uint32(txAvgSize)
262+
slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return
263+
264+
return func() (size uint32, done bool) {
265+
slotsize += blobSize
266+
finished := slotsize > maxBlobsPerTransaction*blobSize+txMaxSize
267+
268+
return slotsize, finished
269+
}
270+
}

0 commit comments

Comments
 (0)