diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool.go b/graft/coreth/plugin/evm/atomic/txpool/mempool.go index a28cbdd6f8bd..d0e167b16f01 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool.go @@ -20,18 +20,14 @@ import ( var ( _ gossip.Set[*atomic.Tx] = (*Mempool)(nil) - ErrAlreadyKnown = errors.New("already known") + ErrDiscarded = errors.New("previously discarded") ErrConflict = errors.New("conflict present") ErrInsufficientFee = errors.New("insufficient fee") ErrMempoolFull = errors.New("mempool full") - // If the transaction is already in the mempool, marking it as Discarded, - // could unexpectedly cause the transaction to have multiple statuses. - // // If the mempool is full, that is not the transaction's fault, so we should // not prevent adding the transaction to the mempool later. errsNotToDiscard = []error{ - ErrAlreadyKnown, ErrMempoolFull, } ) @@ -174,17 +170,17 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error { // add it again. txID := tx.ID() if _, exists := m.issuedTxs[txID]; exists { - return fmt.Errorf("%w: tx %s was issued previously", ErrAlreadyKnown, txID) + return nil } if _, exists := m.currentTxs[txID]; exists { - return fmt.Errorf("%w: tx %s is being built into a block", ErrAlreadyKnown, txID) + return nil } if _, exists := m.pendingTxs.Get(txID); exists { - return fmt.Errorf("%w: tx %s is pending", ErrAlreadyKnown, txID) + return nil } if !local { if _, exists := m.discardedTxs.Get(txID); exists { - return fmt.Errorf("%w: tx %s was discarded", ErrAlreadyKnown, txID) + return fmt.Errorf("tx %s was %w", txID, ErrDiscarded) } } if !force && m.verify != nil { diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go b/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go index 65f4872ba8d8..e77c0ccc6e4f 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go @@ -40,7 +40,7 @@ func TestMempoolAddTx(t *testing.T) { } } -// Add should return an error if a tx is already known +// Add should not return an error if a tx is already known func TestMempoolAdd(t *testing.T) { require := require.New(t) @@ -54,8 +54,7 @@ func TestMempoolAdd(t *testing.T) { tx := atomictest.GenerateTestImportTxWithGas(1, 1) require.NoError(m.Add(tx)) - err = m.Add(tx) - require.ErrorIs(err, ErrAlreadyKnown) + require.NoError(m.Add(tx)) } // Add should return an error if a tx doesn't consume any gas diff --git a/graft/coreth/plugin/evm/atomic/vm/api.go b/graft/coreth/plugin/evm/atomic/vm/api.go index 5337f5f987da..5c0d2629f36f 100644 --- a/graft/coreth/plugin/evm/atomic/vm/api.go +++ b/graft/coreth/plugin/evm/atomic/vm/api.go @@ -12,7 +12,6 @@ import ( "github.com/ava-labs/avalanchego/api" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic/txpool" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/client" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/formatting" @@ -146,8 +145,7 @@ func (service *AvaxAPI) IssueTx(_ *http.Request, args *api.FormattedTx, response service.vm.Ctx.Lock.Lock() defer service.vm.Ctx.Lock.Unlock() - err = service.vm.AtomicMempool.AddLocalTx(tx) - if err != nil && !errors.Is(err, txpool.ErrAlreadyKnown) { + if err := service.vm.AtomicMempool.AddLocalTx(tx); err != nil { return err } diff --git a/graft/coreth/plugin/evm/atomic/vm/block_extension.go b/graft/coreth/plugin/evm/atomic/vm/block_extension.go index 74a30a11522f..001135d01cfe 100644 --- a/graft/coreth/plugin/evm/atomic/vm/block_extension.go +++ b/graft/coreth/plugin/evm/atomic/vm/block_extension.go @@ -212,7 +212,9 @@ func (be *blockExtension) Accept(acceptedBatch database.Batch) error { func (be *blockExtension) Reject() error { vm := be.blockExtender.vm for _, tx := range be.atomicTxs { - // Re-issue the transaction in the mempool, continue even if it fails + // We must remove the rejected transaction first to allow any + // transactions marked as Issued to be moved back into the Pending + // state. vm.AtomicMempool.RemoveTx(tx) if err := vm.AtomicMempool.AddRemoteTx(tx); err != nil { log.Debug("Failed to re-issue transaction in rejected block", "txID", tx.ID(), "err", err) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index b6129670e012..e10c131b13e5 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -7,6 +7,7 @@ package evm import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -117,7 +118,11 @@ func (g *GossipEthTxPool) Subscribe(ctx context.Context) { // Add enqueues the transaction to the mempool. Subscribe should be called // to receive an event if tx is actually added to the mempool or not. func (g *GossipEthTxPool) Add(tx *GossipEthTx) error { - return g.mempool.Add([]*types.Transaction{tx.Tx}, false, false)[0] + err := g.mempool.Add([]*types.Transaction{tx.Tx}, false, false)[0] + if errors.Is(err, txpool.ErrAlreadyKnown) { + return nil + } + return err } // Has should just return whether or not the [txID] is still in the mempool, diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index cb8222ceff4f..9456c4e05126 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -5,7 +5,6 @@ package gossip import ( "context" - "fmt" "sync" "testing" "time" @@ -74,16 +73,11 @@ type setDouble struct { } func (s *setDouble) Add(gossipable tx) error { - if s.txs.Contains(gossipable) { - return fmt.Errorf("%s already present", ids.ID(gossipable)) - } - s.txs.Add(gossipable) s.bloom.Add(gossipable) if s.onAdd != nil { s.onAdd(gossipable) } - return nil } diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go index b535609a7a48..76818e3798b4 100644 --- a/network/p2p/gossip/gossipable.go +++ b/network/p2p/gossip/gossipable.go @@ -18,8 +18,9 @@ type Marshaller[T Gossipable] interface { // Set holds a set of known Gossipable items type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not - // added. + // Add adds a Gossipable to the set. If the Gossipable is already in the + // set, no error should be returned. Otherwise, if the Gossipable was not + // added to the set, an error should be returned. Add(gossipable T) error // Has returns true if the gossipable is in the set. Has(gossipID ids.ID) bool diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index fb2077449dae..336bc95888c0 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -5,7 +5,6 @@ package network import ( "context" - "fmt" "sync" "time" @@ -93,14 +92,12 @@ type gossipMempool struct { bloom *gossip.BloomFilter } -// Add is called by the p2p SDK when handling transactions that were pushed to -// us and when handling transactions that were pulled from a peer. If this -// returns a nil error while handling push gossip, the p2p SDK will queue the -// transaction to push gossip as well. +// Add is called by the p2p SDK when handling transactions that sent pushed to +// us and when handling transactions that were pulled from a peer. func (g *gossipMempool) Add(tx *txs.Tx) error { txID := tx.ID() if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) + return nil } if reason := g.Mempool.GetDropReason(txID); reason != nil { diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index 45a873acb60e..9b924c26f316 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -189,9 +189,8 @@ func (n *Network) PullGossip(ctx context.Context) { // the tx is added to the mempool, it will attempt to push gossip the tx to // random peers in the network. // -// If the tx is already in the mempool, mempool.ErrDuplicateTx will be -// returned. -// If the tx is not added to the mempool, an error will be returned. +// If the tx is already in the mempool, no error will be returned. +// If the tx is otherwise not added to the mempool, an error will be returned. func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { if err := n.mempool.Add(tx); err != nil { return err diff --git a/vms/avm/network/network_test.go b/vms/avm/network/network_test.go index 4f8df6d55037..78ff43636efb 100644 --- a/vms/avm/network/network_test.go +++ b/vms/avm/network/network_test.go @@ -65,17 +65,6 @@ func TestNetworkIssueTxFromRPC(t *testing.T) { } tests := []test{ - { - name: "mempool has transaction", - mempool: func() mempool.Mempool[*txs.Tx] { - mempool, err := xmempool.New("", prometheus.NewRegistry()) - require.NoError(t, err) - require.NoError(t, mempool.Add(&txs.Tx{Unsigned: &txs.BaseTx{}})) - return mempool - }(), - tx: &txs.Tx{Unsigned: &txs.BaseTx{}}, - expectedErr: mempool.ErrDuplicateTx, - }, { name: "transaction marked as dropped in mempool", mempool: func() mempool.Mempool[*txs.Tx] { @@ -198,6 +187,22 @@ func TestNetworkIssueTxFromRPC(t *testing.T) { tx: &txs.Tx{Unsigned: &txs.BaseTx{}}, expectedErr: nil, }, + { + name: "mempool has transaction", + mempool: func() mempool.Mempool[*txs.Tx] { + mempool, err := xmempool.New("", prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, mempool.Add(&txs.Tx{Unsigned: &txs.BaseTx{}})) + return mempool + }(), + appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { + appSender := commonmock.NewSender(ctrl) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + return appSender + }, + tx: &txs.Tx{Unsigned: &txs.BaseTx{}}, + expectedErr: nil, + }, } for _, tt := range tests { diff --git a/vms/avm/vm.go b/vms/avm/vm.go index 35b8825530d4..9528cf4c7a99 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -35,7 +35,6 @@ import ( "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/avm/utxo" "github.com/ava-labs/avalanchego/vms/components/avax" - "github.com/ava-labs/avalanchego/vms/txs/mempool" blockbuilder "github.com/ava-labs/avalanchego/vms/avm/block/builder" blockexecutor "github.com/ava-labs/avalanchego/vms/avm/block/executor" @@ -468,14 +467,13 @@ func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) { func (vm *VM) issueTxFromRPC(tx *txs.Tx) (ids.ID, error) { txID := tx.ID() err := vm.network.IssueTxFromRPC(tx) - if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) { + if err != nil { vm.ctx.Log.Debug("failed to add tx to mempool", zap.Stringer("txID", txID), zap.Error(err), ) - return txID, err } - return txID, nil + return txID, err } /* diff --git a/vms/avm/wallet_service.go b/vms/avm/wallet_service.go index 7a04bfc9d4b3..149449dde60e 100644 --- a/vms/avm/wallet_service.go +++ b/vms/avm/wallet_service.go @@ -4,7 +4,6 @@ package avm import ( - "errors" "fmt" "net/http" @@ -16,7 +15,6 @@ import ( "github.com/ava-labs/avalanchego/utils/linked" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm/txs" - "github.com/ava-labs/avalanchego/vms/txs/mempool" ) type WalletService struct { @@ -45,9 +43,6 @@ func (w *WalletService) decided(txID ids.ID) { ) return } - if errors.Is(err, mempool.ErrDuplicateTx) { - return - } w.pendingTxs.Delete(txID) w.vm.ctx.Log.Warn("dropping tx issued over wallet API", @@ -71,17 +66,16 @@ func (w *WalletService) issue(tx *txs.Tx) (ids.ID, error) { } if w.pendingTxs.Len() == 0 { - if err := w.vm.network.IssueTxFromRPCWithoutVerification(tx); err == nil { - w.vm.ctx.Log.Info("issued tx to mempool over wallet API", - zap.Stringer("txID", txID), - ) - } else if !errors.Is(err, mempool.ErrDuplicateTx) { + if err := w.vm.network.IssueTxFromRPCWithoutVerification(tx); err != nil { w.vm.ctx.Log.Warn("failed to issue tx over wallet API", zap.Stringer("txID", txID), zap.Error(err), ) return ids.Empty, err } + w.vm.ctx.Log.Info("issued tx to mempool over wallet API", + zap.Stringer("txID", txID), + ) } else { w.vm.ctx.Log.Info("enqueueing tx over wallet API", zap.Stringer("txID", txID), diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 006c1740c57f..bc2f924326ba 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -19,8 +19,6 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" - - txmempool "github.com/ava-labs/avalanchego/vms/txs/mempool" ) var ( @@ -97,7 +95,7 @@ type gossipMempool struct { func (g *gossipMempool) Add(tx *txs.Tx) error { txID := tx.ID() if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("tx %s dropped: %w", txID, txmempool.ErrDuplicateTx) + return nil } if reason := g.Mempool.GetDropReason(txID); reason != nil { diff --git a/vms/platformvm/network/gossip_test.go b/vms/platformvm/network/gossip_test.go index 87121f26961b..0e8ec84ab5a8 100644 --- a/vms/platformvm/network/gossip_test.go +++ b/vms/platformvm/network/gossip_test.go @@ -17,7 +17,6 @@ import ( "github.com/ava-labs/avalanchego/vms/components/gas" "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/secp256k1fx" - "github.com/ava-labs/avalanchego/vms/txs/mempool" pmempool "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" ) @@ -59,7 +58,7 @@ func TestGossipMempoolAddVerificationError(t *testing.T) { require.False(gossipMempool.bloom.Has(tx)) } -// Adding a duplicate to the mempool should return an error +// Adding a duplicate to the mempool should not return an error func TestMempoolDuplicate(t *testing.T) { require := require.New(t) @@ -95,7 +94,6 @@ func TestMempoolDuplicate(t *testing.T) { TxID: txID, } - require.NoError(testMempool.Add(tx)) gossipMempool, err := newGossipMempool( testMempool, prometheus.NewRegistry(), @@ -107,9 +105,11 @@ func TestMempoolDuplicate(t *testing.T) { ) require.NoError(err) - err = gossipMempool.Add(tx) - require.ErrorIs(err, mempool.ErrDuplicateTx) - require.False(gossipMempool.bloom.Has(tx)) + require.NoError(gossipMempool.Add(tx)) + require.True(gossipMempool.bloom.Has(tx)) + + require.NoError(gossipMempool.Add(tx)) + require.True(gossipMempool.bloom.Has(tx)) } // Adding a tx to the mempool should add it to the bloom filter diff --git a/vms/platformvm/network/network_test.go b/vms/platformvm/network/network_test.go index f9b6e5543ce7..49a7458eabd1 100644 --- a/vms/platformvm/network/network_test.go +++ b/vms/platformvm/network/network_test.go @@ -118,7 +118,7 @@ func TestNetworkIssueTxFromRPC(t *testing.T) { }, }, }, - expectedErr: mempool.ErrDuplicateTx, + expectedErr: nil, }, { name: "transaction marked as dropped in mempool", diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 4b270e045a3e..bfee3232f02b 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -40,7 +40,6 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/platformvm/utxo" "github.com/ava-labs/avalanchego/vms/secp256k1fx" - "github.com/ava-labs/avalanchego/vms/txs/mempool" snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block" blockbuilder "github.com/ava-labs/avalanchego/vms/platformvm/block/builder" @@ -504,13 +503,11 @@ func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, erro func (vm *VM) issueTxFromRPC(tx *txs.Tx) error { err := vm.Network.IssueTxFromRPC(tx) - if err != nil && !errors.Is(err, mempool.ErrDuplicateTx) { + if err != nil { vm.ctx.Log.Debug("failed to add tx to mempool", zap.Stringer("txID", tx.ID()), zap.Error(err), ) - return err } - - return nil + return err } diff --git a/vms/txs/mempool/mempool.go b/vms/txs/mempool/mempool.go index cc5f2e5297f2..4f8320c91b7c 100644 --- a/vms/txs/mempool/mempool.go +++ b/vms/txs/mempool/mempool.go @@ -32,7 +32,6 @@ const ( ) var ( - ErrDuplicateTx = errors.New("duplicate tx") ErrTxTooLarge = errors.New("tx too large") ErrMempoolFull = errors.New("mempool is full") ErrConflictsWithOtherTx = errors.New("tx conflicts with other tx") @@ -110,7 +109,7 @@ func (m *mempool[T]) Add(tx T) error { defer m.lock.Unlock() if _, ok := m.unissuedTxs.Get(txID); ok { - return fmt.Errorf("%w: %s", ErrDuplicateTx, txID) + return nil } txSize := tx.Size() diff --git a/vms/txs/mempool/mempool_test.go b/vms/txs/mempool/mempool_test.go index 7d53e259b72c..99f6453f7230 100644 --- a/vms/txs/mempool/mempool_test.go +++ b/vms/txs/mempool/mempool_test.go @@ -61,10 +61,10 @@ func TestAdd(t *testing.T) { dropReason: nil, }, { - name: "attempt adding duplicate tx", + name: "add duplicate tx", initialTxs: []*dummyTx{tx0}, tx: tx0, - err: ErrDuplicateTx, + err: nil, dropReason: nil, }, {