Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions graft/coreth/plugin/evm/atomic/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import (
var (
_ gossip.Set[*atomic.Tx] = (*Mempool)(nil)

ErrAlreadyKnown = errors.New("already known")
ErrDiscarded = errors.New("previously discarded")
ErrConflict = errors.New("conflict present")
ErrInsufficientFee = errors.New("insufficient fee")
ErrMempoolFull = errors.New("mempool full")

// If the transaction is already in the mempool, marking it as Discarded,
// could unexpectedly cause the transaction to have multiple statuses.
//
// If the mempool is full, that is not the transaction's fault, so we should
// not prevent adding the transaction to the mempool later.
errsNotToDiscard = []error{
ErrAlreadyKnown,
ErrMempoolFull,
}
)
Expand Down Expand Up @@ -174,17 +170,17 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error {
// add it again.
txID := tx.ID()
if _, exists := m.issuedTxs[txID]; exists {
return fmt.Errorf("%w: tx %s was issued previously", ErrAlreadyKnown, txID)
return nil
}
if _, exists := m.currentTxs[txID]; exists {
return fmt.Errorf("%w: tx %s is being built into a block", ErrAlreadyKnown, txID)
return nil
}
if _, exists := m.pendingTxs.Get(txID); exists {
return fmt.Errorf("%w: tx %s is pending", ErrAlreadyKnown, txID)
return nil
}
if !local {
if _, exists := m.discardedTxs.Get(txID); exists {
return fmt.Errorf("%w: tx %s was discarded", ErrAlreadyKnown, txID)
return fmt.Errorf("tx %s was %w", txID, ErrDiscarded)
}
}
if !force && m.verify != nil {
Expand Down
5 changes: 2 additions & 3 deletions graft/coreth/plugin/evm/atomic/txpool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMempoolAddTx(t *testing.T) {
}
}

// Add should return an error if a tx is already known
// Add should not return an error if a tx is already known
func TestMempoolAdd(t *testing.T) {
require := require.New(t)

Expand All @@ -54,8 +54,7 @@ func TestMempoolAdd(t *testing.T) {

tx := atomictest.GenerateTestImportTxWithGas(1, 1)
require.NoError(m.Add(tx))
err = m.Add(tx)
require.ErrorIs(err, ErrAlreadyKnown)
require.NoError(m.Add(tx))
}

// Add should return an error if a tx doesn't consume any gas
Expand Down
4 changes: 1 addition & 3 deletions graft/coreth/plugin/evm/atomic/vm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/ava-labs/avalanchego/api"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic/txpool"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/client"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/formatting"
Expand Down Expand Up @@ -146,8 +145,7 @@ func (service *AvaxAPI) IssueTx(_ *http.Request, args *api.FormattedTx, response
service.vm.Ctx.Lock.Lock()
defer service.vm.Ctx.Lock.Unlock()

err = service.vm.AtomicMempool.AddLocalTx(tx)
if err != nil && !errors.Is(err, txpool.ErrAlreadyKnown) {
if err := service.vm.AtomicMempool.AddLocalTx(tx); err != nil {
return err
}

Expand Down
4 changes: 3 additions & 1 deletion graft/coreth/plugin/evm/atomic/vm/block_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func (be *blockExtension) Accept(acceptedBatch database.Batch) error {
func (be *blockExtension) Reject() error {
vm := be.blockExtender.vm
for _, tx := range be.atomicTxs {
// Re-issue the transaction in the mempool, continue even if it fails
// We must remove the rejected transaction first to allow any
// transactions marked as Issued to be moved back into the Pending
// state.
vm.AtomicMempool.RemoveTx(tx)
if err := vm.AtomicMempool.AddRemoteTx(tx); err != nil {
log.Debug("Failed to re-issue transaction in rejected block", "txID", tx.ID(), "err", err)
Expand Down
7 changes: 6 additions & 1 deletion graft/coreth/plugin/evm/eth_gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package evm

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -117,7 +118,11 @@ func (g *GossipEthTxPool) Subscribe(ctx context.Context) {
// Add enqueues the transaction to the mempool. Subscribe should be called
// to receive an event if tx is actually added to the mempool or not.
func (g *GossipEthTxPool) Add(tx *GossipEthTx) error {
return g.mempool.Add([]*types.Transaction{tx.Tx}, false, false)[0]
err := g.mempool.Add([]*types.Transaction{tx.Tx}, false, false)[0]
if errors.Is(err, txpool.ErrAlreadyKnown) {
return nil
}
return err
}

// Has should just return whether or not the [txID] is still in the mempool,
Expand Down
6 changes: 0 additions & 6 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package gossip

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -74,16 +73,11 @@ type setDouble struct {
}

func (s *setDouble) Add(gossipable tx) error {
if s.txs.Contains(gossipable) {
return fmt.Errorf("%s already present", ids.ID(gossipable))
}

s.txs.Add(gossipable)
s.bloom.Add(gossipable)
if s.onAdd != nil {
s.onAdd(gossipable)
}

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions network/p2p/gossip/gossipable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ type Marshaller[T Gossipable] interface {

// Set holds a set of known Gossipable items
type Set[T Gossipable] interface {
// Add adds a Gossipable to the set. Returns an error if gossipable was not
// added.
// Add adds a Gossipable to the set. If the Gossipable is already in the
// set, no error should be returned. Otherwise, if the Gossipable was not
// added to the set, an error should be returned.
Add(gossipable T) error
// Has returns true if the gossipable is in the set.
Has(gossipID ids.ID) bool
Expand Down
9 changes: 3 additions & 6 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package network

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -93,14 +92,12 @@ type gossipMempool struct {
bloom *gossip.BloomFilter
}

// Add is called by the p2p SDK when handling transactions that were pushed to
// us and when handling transactions that were pulled from a peer. If this
// returns a nil error while handling push gossip, the p2p SDK will queue the
// transaction to push gossip as well.
// Add is called by the p2p SDK when handling transactions that sent pushed to
// us and when handling transactions that were pulled from a peer.
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
return nil
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
Expand Down
5 changes: 2 additions & 3 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ func (n *Network) PullGossip(ctx context.Context) {
// the tx is added to the mempool, it will attempt to push gossip the tx to
// random peers in the network.
//
// If the tx is already in the mempool, mempool.ErrDuplicateTx will be
// returned.
// If the tx is not added to the mempool, an error will be returned.
// If the tx is already in the mempool, no error will be returned.
// If the tx is otherwise not added to the mempool, an error will be returned.
func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
if err := n.mempool.Add(tx); err != nil {
return err
Expand Down
27 changes: 16 additions & 11 deletions vms/avm/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,6 @@ func TestNetworkIssueTxFromRPC(t *testing.T) {
}

tests := []test{
{
name: "mempool has transaction",
mempool: func() mempool.Mempool[*txs.Tx] {
mempool, err := xmempool.New("", prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, mempool.Add(&txs.Tx{Unsigned: &txs.BaseTx{}}))
return mempool
}(),
tx: &txs.Tx{Unsigned: &txs.BaseTx{}},
expectedErr: mempool.ErrDuplicateTx,
},
{
name: "transaction marked as dropped in mempool",
mempool: func() mempool.Mempool[*txs.Tx] {
Expand Down Expand Up @@ -198,6 +187,22 @@ func TestNetworkIssueTxFromRPC(t *testing.T) {
tx: &txs.Tx{Unsigned: &txs.BaseTx{}},
expectedErr: nil,
},
{
name: "mempool has transaction",
mempool: func() mempool.Mempool[*txs.Tx] {
mempool, err := xmempool.New("", prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, mempool.Add(&txs.Tx{Unsigned: &txs.BaseTx{}}))
return mempool
}(),
appSenderFunc: func(ctrl *gomock.Controller) common.AppSender {
appSender := commonmock.NewSender(ctrl)
appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
return appSender
},
tx: &txs.Tx{Unsigned: &txs.BaseTx{}},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand Down
6 changes: 2 additions & 4 deletions vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/avm/utxo"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/txs/mempool"

blockbuilder "github.com/ava-labs/avalanchego/vms/avm/block/builder"
blockexecutor "github.com/ava-labs/avalanchego/vms/avm/block/executor"
Expand Down Expand Up @@ -468,14 +467,13 @@ func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) {
func (vm *VM) issueTxFromRPC(tx *txs.Tx) (ids.ID, error) {
txID := tx.ID()
err := vm.network.IssueTxFromRPC(tx)
if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) {
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Stringer("txID", txID),
zap.Error(err),
)
return txID, err
}
return txID, nil
return txID, err
}

/*
Expand Down
14 changes: 4 additions & 10 deletions vms/avm/wallet_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package avm

import (
"errors"
"fmt"
"net/http"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/ava-labs/avalanchego/utils/linked"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/txs/mempool"
)

type WalletService struct {
Expand Down Expand Up @@ -45,9 +43,6 @@ func (w *WalletService) decided(txID ids.ID) {
)
return
}
if errors.Is(err, mempool.ErrDuplicateTx) {
return
}

w.pendingTxs.Delete(txID)
w.vm.ctx.Log.Warn("dropping tx issued over wallet API",
Expand All @@ -71,17 +66,16 @@ func (w *WalletService) issue(tx *txs.Tx) (ids.ID, error) {
}

if w.pendingTxs.Len() == 0 {
if err := w.vm.network.IssueTxFromRPCWithoutVerification(tx); err == nil {
w.vm.ctx.Log.Info("issued tx to mempool over wallet API",
zap.Stringer("txID", txID),
)
} else if !errors.Is(err, mempool.ErrDuplicateTx) {
if err := w.vm.network.IssueTxFromRPCWithoutVerification(tx); err != nil {
w.vm.ctx.Log.Warn("failed to issue tx over wallet API",
zap.Stringer("txID", txID),
zap.Error(err),
)
return ids.Empty, err
}
w.vm.ctx.Log.Info("issued tx to mempool over wallet API",
zap.Stringer("txID", txID),
)
} else {
w.vm.ctx.Log.Info("enqueueing tx over wallet API",
zap.Stringer("txID", txID),
Expand Down
4 changes: 1 addition & 3 deletions vms/platformvm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
"github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool"

txmempool "github.com/ava-labs/avalanchego/vms/txs/mempool"
)

var (
Expand Down Expand Up @@ -97,7 +95,7 @@ type gossipMempool struct {
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("tx %s dropped: %w", txID, txmempool.ErrDuplicateTx)
return nil
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
Expand Down
12 changes: 6 additions & 6 deletions vms/platformvm/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ava-labs/avalanchego/vms/components/gas"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
"github.com/ava-labs/avalanchego/vms/txs/mempool"

pmempool "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool"
)
Expand Down Expand Up @@ -59,7 +58,7 @@ func TestGossipMempoolAddVerificationError(t *testing.T) {
require.False(gossipMempool.bloom.Has(tx))
}

// Adding a duplicate to the mempool should return an error
// Adding a duplicate to the mempool should not return an error
func TestMempoolDuplicate(t *testing.T) {
require := require.New(t)

Expand Down Expand Up @@ -95,7 +94,6 @@ func TestMempoolDuplicate(t *testing.T) {
TxID: txID,
}

require.NoError(testMempool.Add(tx))
gossipMempool, err := newGossipMempool(
testMempool,
prometheus.NewRegistry(),
Expand All @@ -107,9 +105,11 @@ func TestMempoolDuplicate(t *testing.T) {
)
require.NoError(err)

err = gossipMempool.Add(tx)
require.ErrorIs(err, mempool.ErrDuplicateTx)
require.False(gossipMempool.bloom.Has(tx))
require.NoError(gossipMempool.Add(tx))
require.True(gossipMempool.bloom.Has(tx))

require.NoError(gossipMempool.Add(tx))
require.True(gossipMempool.bloom.Has(tx))
}

// Adding a tx to the mempool should add it to the bloom filter
Expand Down
2 changes: 1 addition & 1 deletion vms/platformvm/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestNetworkIssueTxFromRPC(t *testing.T) {
},
},
},
expectedErr: mempool.ErrDuplicateTx,
expectedErr: nil,
},
{
name: "transaction marked as dropped in mempool",
Expand Down
7 changes: 2 additions & 5 deletions vms/platformvm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
"github.com/ava-labs/avalanchego/vms/platformvm/utxo"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
"github.com/ava-labs/avalanchego/vms/txs/mempool"

snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block"
blockbuilder "github.com/ava-labs/avalanchego/vms/platformvm/block/builder"
Expand Down Expand Up @@ -504,13 +503,11 @@ func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, erro

func (vm *VM) issueTxFromRPC(tx *txs.Tx) error {
err := vm.Network.IssueTxFromRPC(tx)
if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) {
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Stringer("txID", tx.ID()),
zap.Error(err),
)
return err
}

return nil
return err
}
Loading
Loading