From 2050615aa80ecc356d85ee1a84b02c5a6b2dab86 Mon Sep 17 00:00:00 2001 From: silaszhr Date: Tue, 10 Dec 2024 16:39:00 +0000 Subject: [PATCH 1/4] wip: emit CommitPayloadEvent after getPayload --- op-node/rollup/engine/engine_controller.go | 1 + op-service/eth/types.go | 5 ++-- op-service/sources/engine_client.go | 27 ++++++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index 817cff4f937c..6540673366a7 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -38,6 +38,7 @@ var ErrNoFCUNeeded = errors.New("no FCU call was needed") type ExecEngine interface { GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) GetMinimizedPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) + GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) NewPayloadWithPayloadId(ctx context.Context, payloadInfo eth.PayloadInfo, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) diff --git a/op-service/eth/types.go b/op-service/eth/types.go index 97b9ae0ed70f..75c3342e4c0a 100644 --- a/op-service/eth/types.go +++ b/op-service/eth/types.go @@ -520,6 +520,7 @@ const ( NewPayloadV3ById EngineAPIMethod = "engine_newPayloadV3ById" GetMinimizedPayloadV3 EngineAPIMethod = "engine_getMinimizedPayloadV3" - GetPayloadV2 EngineAPIMethod = "engine_getPayloadV2" - GetPayloadV3 EngineAPIMethod = "engine_getPayloadV3" + GetPayloadV2 EngineAPIMethod = "engine_getPayloadV2" + GetPayloadV3 EngineAPIMethod = "engine_getPayloadV3" + GetBuiltPayloadV3 EngineAPIMethod = "engine_getBuiltPayloadV3" ) diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index b8d85584ea9c..14d725df3ba3 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -62,6 +62,7 @@ type EngineVersionProvider interface { NewPayloadByIdVersion(timestamp uint64) eth.EngineAPIMethod GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod GetMinimizedPayloadVersion(timestamp uint64) eth.EngineAPIMethod + GetBuiltPayloadVersion(timestamp uint64) eth.EngineAPIMethod } func NewEngineAPIClient(rpc client.RPC, l log.Logger, evp EngineVersionProvider) *EngineAPIClient { @@ -237,6 +238,32 @@ func (s *EngineAPIClient) GetMinimizedPayload(ctx context.Context, payloadInfo e return &result, nil } +func (s *EngineAPIClient) GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) { + e := s.log.New("payload_id", payloadInfo.ID) + e.Trace("getting payload") + var result eth.ExecutionPayloadEnvelope + method := s.evp.GetBuiltPayloadVersion(payloadInfo.Timestamp) + err := s.RPC.CallContext(ctx, &result, string(method), payloadInfo.ID) + if err != nil { + e.Warn("Failed to get payload", "payload_id", payloadInfo.ID, "err", err) + if rpcErr, ok := err.(rpc.Error); ok { + code := eth.ErrorCode(rpcErr.ErrorCode()) + switch code { + case eth.UnknownPayload: + return nil, eth.InputError{ + Inner: err, + Code: code, + } + default: + return nil, fmt.Errorf("unrecognized rpc error: %w", err) + } + } + return nil, err + } + e.Trace("Received payload") + return &result, nil +} + func (s *EngineAPIClient) SignalSuperchainV1(ctx context.Context, recommended, required params.ProtocolVersion) (params.ProtocolVersion, error) { var result params.ProtocolVersion err := s.RPC.CallContext(ctx, &result, "engine_signalSuperchainV1", &catalyst.SuperchainSignal{ From b3a23e3b937f5df0cd8f5025166e99ec6ae56ca6 Mon Sep 17 00:00:00 2001 From: silaszhr Date: Mon, 13 Jan 2025 23:06:49 +0800 Subject: [PATCH 2/4] feat: emit CommitPayloadEvent after getPayload --- op-node/rollup/conductor/conductor_helper.go | 120 +++++++++++++++++++ op-node/rollup/driver/driver.go | 3 + op-node/rollup/engine/build_seal.go | 8 ++ op-node/rollup/sequencing/disabled.go | 5 + op-node/rollup/sequencing/iface.go | 2 + op-node/rollup/sequencing/sequencer.go | 11 +- op-service/sources/engine_client.go | 3 +- 7 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 op-node/rollup/conductor/conductor_helper.go diff --git a/op-node/rollup/conductor/conductor_helper.go b/op-node/rollup/conductor/conductor_helper.go new file mode 100644 index 000000000000..5a0062131f76 --- /dev/null +++ b/op-node/rollup/conductor/conductor_helper.go @@ -0,0 +1,120 @@ +package conductor + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +// SequencerActionEvent triggers the sequencer to start/seal a block, if active and ready to act. +// This event is used to prioritize sequencer work over derivation work, +// by emitting it before e.g. a derivation-pipeline step. +// A future sequencer in an async world may manage its own execution. +type CommitPayloadEvent struct { + // if payload should be promoted to safe (must also be pending safe, see DerivedFrom) + IsLastInSpan bool + // payload is promoted to pending-safe if non-zero + DerivedFrom eth.L1BlockRef + + Info eth.PayloadInfo + Ref eth.L2BlockRef +} + +func (ev CommitPayloadEvent) String() string { + return "commit-payload" +} + +type BuildingState struct { + Onto eth.L2BlockRef + Info eth.PayloadInfo + + Started time.Time + + // Set once known + Ref eth.L2BlockRef +} + +type ExecEngine interface { + GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) + GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) +} + +type AsyncGossiper interface { + Gossip(payload *eth.ExecutionPayloadEnvelope) + Get() *eth.ExecutionPayloadEnvelope + Clear() + Stop() + Start() +} + +type SequencerClient interface { + CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error +} + +type ConductorHelper struct { + ctx context.Context + + engine ExecEngine // Underlying execution engine RPC + + log log.Logger + rollupCfg *rollup.Config + spec *rollup.ChainSpec + sequencer SequencerClient + asyncGossip AsyncGossiper + + emitter event.Emitter +} + +func NewConductorHelper(driverCtx context.Context, engine ExecEngine, log log.Logger, rollupCfg *rollup.Config, + sequencer SequencerClient, + asyncGossip AsyncGossiper, +) *ConductorHelper { + return &ConductorHelper{ + ctx: driverCtx, + engine: engine, + log: log, + rollupCfg: rollupCfg, + spec: rollup.NewChainSpec(rollupCfg), + sequencer: sequencer, + asyncGossip: asyncGossip, + } +} + +func (d *ConductorHelper) AttachEmitter(em event.Emitter) { + d.emitter = em +} + +func (d *ConductorHelper) OnEvent(ev event.Event) bool { + + switch x := ev.(type) { + case CommitPayloadEvent: + d.onCommitPayload(x) + + default: + return false + } + return true +} + +func (d *ConductorHelper) onCommitPayload(ev CommitPayloadEvent) { + const getPayloadTimeout = time.Second * 100 + ctx, cancel := context.WithTimeout(d.ctx, getPayloadTimeout) + defer cancel() + + envelope, err := d.engine.GetPayload(ctx, ev.Info) + + if err != nil { + if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all + d.log.Warn("Cannot seal block, payload ID is unknown", + "payloadID", ev.Info.ID, "payload_time", ev.Info.Timestamp) + } + return + } + d.asyncGossip.Gossip(envelope) + d.sequencer.CommitUnsafePayload(envelope) +} diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 81607e612d5a..0be64cb9c137 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -249,6 +249,9 @@ func NewDriver( sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin, sequencerStateListener, sequencerConductor, asyncGossiper, metrics) sys.Register("sequencer", sequencer, opts) + + conductorHelper := conductor.NewConductorHelper(driverCtx, l2, log, cfg, sequencer, asyncGossiper) + sys.Register("conductor-helper", conductorHelper, opts) } else { sequencer = sequencing.DisabledSequencer{} } diff --git a/op-node/rollup/engine/build_seal.go b/op-node/rollup/engine/build_seal.go index 8c7580932e1b..6877476de33a 100644 --- a/op-node/rollup/engine/build_seal.go +++ b/op-node/rollup/engine/build_seal.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -100,6 +101,13 @@ func (eq *EngDeriver) onBuildSeal(ev BuildSealEvent) { return } + eq.emitter.Emit(conductor.CommitPayloadEvent{ + IsLastInSpan: ev.IsLastInSpan, + DerivedFrom: ev.DerivedFrom, + Info: ev.Info, + Ref: ref, + }) + now := time.Now() sealTime := now.Sub(sealingStart) buildTime := now.Sub(ev.BuildStarted) diff --git a/op-node/rollup/sequencing/disabled.go b/op-node/rollup/sequencing/disabled.go index 3ef6d7b2ca73..4ae3c621f847 100644 --- a/op-node/rollup/sequencing/disabled.go +++ b/op-node/rollup/sequencing/disabled.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" ) var ErrSequencerNotEnabled = errors.New("sequencer is not enabled") @@ -50,4 +51,8 @@ func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error { return ErrSequencerNotEnabled } +func (ds DisabledSequencer) CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error { + return ErrSequencerNotEnabled +} + func (ds DisabledSequencer) Close() {} diff --git a/op-node/rollup/sequencing/iface.go b/op-node/rollup/sequencing/iface.go index 533b828b0d5b..73a87d1d383a 100644 --- a/op-node/rollup/sequencing/iface.go +++ b/op-node/rollup/sequencing/iface.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" ) type SequencerIface interface { @@ -21,5 +22,6 @@ type SequencerIface interface { Stop(ctx context.Context) (hash common.Hash, err error) SetMaxSafeLag(ctx context.Context, v uint64) error OverrideLeader(ctx context.Context) error + CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error Close() } diff --git a/op-node/rollup/sequencing/sequencer.go b/op-node/rollup/sequencing/sequencer.go index 80121784676a..64ef9dc00b70 100644 --- a/op-node/rollup/sequencing/sequencer.go +++ b/op-node/rollup/sequencing/sequencer.go @@ -293,7 +293,7 @@ func (d *Sequencer) onBuildSealed(x engine.BuildSealedEvent) { // begin gossiping as soon as possible // asyncGossip.Clear() will be called later if an non-temporary error is found, // or if the payload is successfully inserted - //d.asyncGossip.Gossip(x.Envelope) + // d.asyncGossip.Gossip(x.Envelope) // Now after having gossiped the block, try to put it in our own canonical chain d.emitter.Emit(engine.PayloadProcessEvent{ IsLastInSpan: x.IsLastInSpan, @@ -745,6 +745,15 @@ func (d *Sequencer) OverrideLeader(ctx context.Context) error { return d.conductor.OverrideLeader(ctx) } +func (d *Sequencer) CommitUnsafePayload(Envelope *eth.ExecutionPayloadEnvelope) error { + if err := d.conductor.CommitUnsafePayload(d.ctx, Envelope); err != nil { + d.emitter.Emit(rollup.EngineTemporaryErrorEvent{ + Err: fmt.Errorf("failed to commit unsafe payload to conductor: %w", err)}) + return err + } + return nil +} + func (d *Sequencer) Close() { d.conductor.Close() d.asyncGossip.Stop() diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index 14d725df3ba3..b7b41f4126da 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -62,7 +62,6 @@ type EngineVersionProvider interface { NewPayloadByIdVersion(timestamp uint64) eth.EngineAPIMethod GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod GetMinimizedPayloadVersion(timestamp uint64) eth.EngineAPIMethod - GetBuiltPayloadVersion(timestamp uint64) eth.EngineAPIMethod } func NewEngineAPIClient(rpc client.RPC, l log.Logger, evp EngineVersionProvider) *EngineAPIClient { @@ -242,7 +241,7 @@ func (s *EngineAPIClient) GetBuiltPayload(ctx context.Context, payloadInfo eth.P e := s.log.New("payload_id", payloadInfo.ID) e.Trace("getting payload") var result eth.ExecutionPayloadEnvelope - method := s.evp.GetBuiltPayloadVersion(payloadInfo.Timestamp) + method := s.evp.GetPayloadVersion(payloadInfo.Timestamp) err := s.RPC.CallContext(ctx, &result, string(method), payloadInfo.ID) if err != nil { e.Warn("Failed to get payload", "payload_id", payloadInfo.ID, "err", err) From 5867c1de398de11cbbdd53dba5b9200023f1680b Mon Sep 17 00:00:00 2001 From: silaszhr Date: Fri, 17 Jan 2025 21:54:19 +0800 Subject: [PATCH 3/4] fix: emit CommitPayloadEvent before GetMinimizedPayload to avoid not being cleared in time --- op-node/rollup/engine/build_seal.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/op-node/rollup/engine/build_seal.go b/op-node/rollup/engine/build_seal.go index 6877476de33a..922323c72510 100644 --- a/op-node/rollup/engine/build_seal.go +++ b/op-node/rollup/engine/build_seal.go @@ -57,6 +57,12 @@ func (eq *EngDeriver) onBuildSeal(ev BuildSealEvent) { defer cancel() sealingStart := time.Now() + eq.emitter.Emit(conductor.CommitPayloadEvent{ + IsLastInSpan: ev.IsLastInSpan, + DerivedFrom: ev.DerivedFrom, + Info: ev.Info, + Ref: eth.L2BlockRef{}, + }) envelope, err := eq.ec.engine.GetMinimizedPayload(ctx, ev.Info) if err != nil { if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all @@ -101,13 +107,6 @@ func (eq *EngDeriver) onBuildSeal(ev BuildSealEvent) { return } - eq.emitter.Emit(conductor.CommitPayloadEvent{ - IsLastInSpan: ev.IsLastInSpan, - DerivedFrom: ev.DerivedFrom, - Info: ev.Info, - Ref: ref, - }) - now := time.Now() sealTime := now.Sub(sealingStart) buildTime := now.Sub(ev.BuildStarted) From 4ab955585f09854e591c1395a2bbc4dfa0717f0b Mon Sep 17 00:00:00 2001 From: silaszhr Date: Sun, 26 Jan 2025 16:34:02 +0800 Subject: [PATCH 4/4] refactor: remove redundant getBuiltPayload method --- op-node/rollup/conductor/conductor_helper.go | 1 - op-node/rollup/engine/engine_controller.go | 1 - op-service/eth/types.go | 5 ++-- op-service/sources/engine_client.go | 26 -------------------- 4 files changed, 2 insertions(+), 31 deletions(-) diff --git a/op-node/rollup/conductor/conductor_helper.go b/op-node/rollup/conductor/conductor_helper.go index 5a0062131f76..5de222d6e0ec 100644 --- a/op-node/rollup/conductor/conductor_helper.go +++ b/op-node/rollup/conductor/conductor_helper.go @@ -41,7 +41,6 @@ type BuildingState struct { type ExecEngine interface { GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) - GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) } type AsyncGossiper interface { diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index 6540673366a7..817cff4f937c 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -38,7 +38,6 @@ var ErrNoFCUNeeded = errors.New("no FCU call was needed") type ExecEngine interface { GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) GetMinimizedPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) - GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) NewPayloadWithPayloadId(ctx context.Context, payloadInfo eth.PayloadInfo, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) diff --git a/op-service/eth/types.go b/op-service/eth/types.go index 75c3342e4c0a..97b9ae0ed70f 100644 --- a/op-service/eth/types.go +++ b/op-service/eth/types.go @@ -520,7 +520,6 @@ const ( NewPayloadV3ById EngineAPIMethod = "engine_newPayloadV3ById" GetMinimizedPayloadV3 EngineAPIMethod = "engine_getMinimizedPayloadV3" - GetPayloadV2 EngineAPIMethod = "engine_getPayloadV2" - GetPayloadV3 EngineAPIMethod = "engine_getPayloadV3" - GetBuiltPayloadV3 EngineAPIMethod = "engine_getBuiltPayloadV3" + GetPayloadV2 EngineAPIMethod = "engine_getPayloadV2" + GetPayloadV3 EngineAPIMethod = "engine_getPayloadV3" ) diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index b7b41f4126da..b8d85584ea9c 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -237,32 +237,6 @@ func (s *EngineAPIClient) GetMinimizedPayload(ctx context.Context, payloadInfo e return &result, nil } -func (s *EngineAPIClient) GetBuiltPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) { - e := s.log.New("payload_id", payloadInfo.ID) - e.Trace("getting payload") - var result eth.ExecutionPayloadEnvelope - method := s.evp.GetPayloadVersion(payloadInfo.Timestamp) - err := s.RPC.CallContext(ctx, &result, string(method), payloadInfo.ID) - if err != nil { - e.Warn("Failed to get payload", "payload_id", payloadInfo.ID, "err", err) - if rpcErr, ok := err.(rpc.Error); ok { - code := eth.ErrorCode(rpcErr.ErrorCode()) - switch code { - case eth.UnknownPayload: - return nil, eth.InputError{ - Inner: err, - Code: code, - } - default: - return nil, fmt.Errorf("unrecognized rpc error: %w", err) - } - } - return nil, err - } - e.Trace("Received payload") - return &result, nil -} - func (s *EngineAPIClient) SignalSuperchainV1(ctx context.Context, recommended, required params.ProtocolVersion) (params.ProtocolVersion, error) { var result params.ProtocolVersion err := s.RPC.CallContext(ctx, &result, "engine_signalSuperchainV1", &catalyst.SuperchainSignal{