diff --git a/op-node/rollup/conductor/conductor_helper.go b/op-node/rollup/conductor/conductor_helper.go new file mode 100644 index 000000000000..5de222d6e0ec --- /dev/null +++ b/op-node/rollup/conductor/conductor_helper.go @@ -0,0 +1,119 @@ +package conductor + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +// SequencerActionEvent triggers the sequencer to start/seal a block, if active and ready to act. +// This event is used to prioritize sequencer work over derivation work, +// by emitting it before e.g. a derivation-pipeline step. +// A future sequencer in an async world may manage its own execution. +type CommitPayloadEvent struct { + // if payload should be promoted to safe (must also be pending safe, see DerivedFrom) + IsLastInSpan bool + // payload is promoted to pending-safe if non-zero + DerivedFrom eth.L1BlockRef + + Info eth.PayloadInfo + Ref eth.L2BlockRef +} + +func (ev CommitPayloadEvent) String() string { + return "commit-payload" +} + +type BuildingState struct { + Onto eth.L2BlockRef + Info eth.PayloadInfo + + Started time.Time + + // Set once known + Ref eth.L2BlockRef +} + +type ExecEngine interface { + GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error) +} + +type AsyncGossiper interface { + Gossip(payload *eth.ExecutionPayloadEnvelope) + Get() *eth.ExecutionPayloadEnvelope + Clear() + Stop() + Start() +} + +type SequencerClient interface { + CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error +} + +type ConductorHelper struct { + ctx context.Context + + engine ExecEngine // Underlying execution engine RPC + + log log.Logger + rollupCfg *rollup.Config + spec *rollup.ChainSpec + sequencer SequencerClient + asyncGossip AsyncGossiper + + emitter event.Emitter +} + +func NewConductorHelper(driverCtx context.Context, engine ExecEngine, log log.Logger, rollupCfg *rollup.Config, + sequencer SequencerClient, + asyncGossip AsyncGossiper, +) *ConductorHelper { + return &ConductorHelper{ + ctx: driverCtx, + engine: engine, + log: log, + rollupCfg: rollupCfg, + spec: rollup.NewChainSpec(rollupCfg), + sequencer: sequencer, + asyncGossip: asyncGossip, + } +} + +func (d *ConductorHelper) AttachEmitter(em event.Emitter) { + d.emitter = em +} + +func (d *ConductorHelper) OnEvent(ev event.Event) bool { + + switch x := ev.(type) { + case CommitPayloadEvent: + d.onCommitPayload(x) + + default: + return false + } + return true +} + +func (d *ConductorHelper) onCommitPayload(ev CommitPayloadEvent) { + const getPayloadTimeout = time.Second * 100 + ctx, cancel := context.WithTimeout(d.ctx, getPayloadTimeout) + defer cancel() + + envelope, err := d.engine.GetPayload(ctx, ev.Info) + + if err != nil { + if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all + d.log.Warn("Cannot seal block, payload ID is unknown", + "payloadID", ev.Info.ID, "payload_time", ev.Info.Timestamp) + } + return + } + d.asyncGossip.Gossip(envelope) + d.sequencer.CommitUnsafePayload(envelope) +} diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 81607e612d5a..0be64cb9c137 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -249,6 +249,9 @@ func NewDriver( sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin, sequencerStateListener, sequencerConductor, asyncGossiper, metrics) sys.Register("sequencer", sequencer, opts) + + conductorHelper := conductor.NewConductorHelper(driverCtx, l2, log, cfg, sequencer, asyncGossiper) + sys.Register("conductor-helper", conductorHelper, opts) } else { sequencer = sequencing.DisabledSequencer{} } diff --git a/op-node/rollup/engine/build_seal.go b/op-node/rollup/engine/build_seal.go index 8c7580932e1b..922323c72510 100644 --- a/op-node/rollup/engine/build_seal.go +++ b/op-node/rollup/engine/build_seal.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -56,6 +57,12 @@ func (eq *EngDeriver) onBuildSeal(ev BuildSealEvent) { defer cancel() sealingStart := time.Now() + eq.emitter.Emit(conductor.CommitPayloadEvent{ + IsLastInSpan: ev.IsLastInSpan, + DerivedFrom: ev.DerivedFrom, + Info: ev.Info, + Ref: eth.L2BlockRef{}, + }) envelope, err := eq.ec.engine.GetMinimizedPayload(ctx, ev.Info) if err != nil { if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all diff --git a/op-node/rollup/sequencing/disabled.go b/op-node/rollup/sequencing/disabled.go index 3ef6d7b2ca73..4ae3c621f847 100644 --- a/op-node/rollup/sequencing/disabled.go +++ b/op-node/rollup/sequencing/disabled.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" ) var ErrSequencerNotEnabled = errors.New("sequencer is not enabled") @@ -50,4 +51,8 @@ func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error { return ErrSequencerNotEnabled } +func (ds DisabledSequencer) CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error { + return ErrSequencerNotEnabled +} + func (ds DisabledSequencer) Close() {} diff --git a/op-node/rollup/sequencing/iface.go b/op-node/rollup/sequencing/iface.go index 533b828b0d5b..73a87d1d383a 100644 --- a/op-node/rollup/sequencing/iface.go +++ b/op-node/rollup/sequencing/iface.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-node/rollup/event" + "github.com/ethereum-optimism/optimism/op-service/eth" ) type SequencerIface interface { @@ -21,5 +22,6 @@ type SequencerIface interface { Stop(ctx context.Context) (hash common.Hash, err error) SetMaxSafeLag(ctx context.Context, v uint64) error OverrideLeader(ctx context.Context) error + CommitUnsafePayload(*eth.ExecutionPayloadEnvelope) error Close() } diff --git a/op-node/rollup/sequencing/sequencer.go b/op-node/rollup/sequencing/sequencer.go index 80121784676a..64ef9dc00b70 100644 --- a/op-node/rollup/sequencing/sequencer.go +++ b/op-node/rollup/sequencing/sequencer.go @@ -293,7 +293,7 @@ func (d *Sequencer) onBuildSealed(x engine.BuildSealedEvent) { // begin gossiping as soon as possible // asyncGossip.Clear() will be called later if an non-temporary error is found, // or if the payload is successfully inserted - //d.asyncGossip.Gossip(x.Envelope) + // d.asyncGossip.Gossip(x.Envelope) // Now after having gossiped the block, try to put it in our own canonical chain d.emitter.Emit(engine.PayloadProcessEvent{ IsLastInSpan: x.IsLastInSpan, @@ -745,6 +745,15 @@ func (d *Sequencer) OverrideLeader(ctx context.Context) error { return d.conductor.OverrideLeader(ctx) } +func (d *Sequencer) CommitUnsafePayload(Envelope *eth.ExecutionPayloadEnvelope) error { + if err := d.conductor.CommitUnsafePayload(d.ctx, Envelope); err != nil { + d.emitter.Emit(rollup.EngineTemporaryErrorEvent{ + Err: fmt.Errorf("failed to commit unsafe payload to conductor: %w", err)}) + return err + } + return nil +} + func (d *Sequencer) Close() { d.conductor.Close() d.asyncGossip.Stop()