Skip to content

Commit

Permalink
7311: Add protocol spec supplier to GetReceiptsFromPeerTask
Browse files Browse the repository at this point in the history
Signed-off-by: Matilda Clerke <[email protected]>
  • Loading branch information
Matilda-Clerke committed Oct 8, 2024
1 parent 4ad85e8 commit 86a1f0b
Show file tree
Hide file tree
Showing 22 changed files with 117 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ public BesuController build() {
final DefaultSynchronizer synchronizer =
createSynchronizer(
protocolSchedule,
currentProtocolSpecSupplier,
worldStateStorageCoordinator,
protocolContext,
ethContext,
Expand Down Expand Up @@ -839,6 +840,7 @@ private TrieLogPruner createTrieLogPruner(
*/
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
Expand All @@ -850,6 +852,7 @@ protected DefaultSynchronizer createSynchronizer(
return new DefaultSynchronizer(
syncConfig,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
worldStateStorageCoordinator,
ethProtocolManager.getBlockBroadcaster(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
Expand All @@ -66,6 +67,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -223,6 +225,7 @@ protected PluginServiceFactory createAdditionalPluginServices(
@Override
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
Expand All @@ -234,6 +237,7 @@ protected DefaultSynchronizer createSynchronizer(
DefaultSynchronizer sync =
super.createSynchronizer(
protocolSchedule,
currentProtocolSpecSupplier,
worldStateStorageCoordinator,
protocolContext,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ public void setTrailingPeerRequirementsSupplier(
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamBestPeers().filter(filter).filter(EthPeer::hasAvailableRequestCapacity).findFirst();
return streamBestPeers()
.filter(filter)
.filter(EthPeer::hasAvailableRequestCapacity)
.findFirst();
}

// Part of the PeerSelector interface, to be split apart later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

Expand All @@ -34,19 +35,24 @@
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {

private final Collection<BlockHeader> blockHeaders;
private final BodyValidator bodyValidator;
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final long requiredBlockchainHeight;

public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final BodyValidator bodyValidator) {
final Collection<BlockHeader> blockHeaders,
final BodyValidator bodyValidator,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier) {
this.blockHeaders = blockHeaders;
this.bodyValidator = bodyValidator;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;

blockHeaders.forEach(
header ->
Expand Down Expand Up @@ -112,6 +118,7 @@ public Collection<PeerTaskRetryBehavior> getPeerTaskRetryBehaviors() {
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
&& (currentProtocolSpecSupplier.get().isPoS()
|| ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final BlockBroadcaster blockBroadcaster,
Expand Down Expand Up @@ -146,6 +148,7 @@ public DefaultSynchronizer(
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,
Expand All @@ -163,6 +166,7 @@ public DefaultSynchronizer(
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,
Expand All @@ -180,6 +184,7 @@ public DefaultSynchronizer(
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class CheckpointDownloadBlockStep {

private final ProtocolSchedule protocolSchedule;
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final Checkpoint checkpoint;
Expand All @@ -48,12 +51,14 @@ public class CheckpointDownloadBlockStep {

public CheckpointDownloadBlockStep(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final Checkpoint checkpoint,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.checkpoint = checkpoint;
Expand Down Expand Up @@ -81,7 +86,8 @@ private CompletableFuture<Optional<BlockWithReceipts>> downloadReceipts(
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
CompletableFuture<Optional<BlockWithReceipts>> futureReceipts = new CompletableFuture<>();
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(List.of(block.getHeader()), new BodyValidator());
new GetReceiptsFromPeerTask(
List.of(block.getHeader()), new BodyValidator(), currentProtocolSpecSupplier);
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> executorResult =
peerTaskExecutor.execute(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
Expand All @@ -44,6 +45,7 @@
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +61,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
Expand Down Expand Up @@ -110,6 +113,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand All @@ -128,6 +132,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.function.Supplier;

public class CheckpointSyncActions extends FastSyncActions {
public CheckpointSyncActions(
final SynchronizerConfiguration syncConfig,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
Expand All @@ -43,6 +47,7 @@ public CheckpointSyncActions(
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand All @@ -58,6 +63,7 @@ public ChainDownloader createChainDownloader(
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.SyncTargetManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.function.Supplier;

public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {

public static ChainDownloader create(
final SynchronizerConfiguration config,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
Expand All @@ -59,6 +63,7 @@ public static ChainDownloader create(
new CheckpointSyncDownloadPipelineFactory(
config,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;

import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipelineFactory {

public CheckpointSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
Expand All @@ -47,6 +50,7 @@ public CheckpointSyncDownloadPipelineFactory(
super(
syncConfig,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
Expand Down Expand Up @@ -86,7 +90,13 @@ protected Pipeline<Hash> createDownloadCheckPointPipeline(

final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(
protocolSchedule, ethContext, peerTaskExecutor, checkpoint, syncConfig, metricsSystem);
protocolSchedule,
currentProtocolSpecSupplier,
ethContext,
peerTaskExecutor,
checkpoint,
syncConfig,
metricsSystem);

return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,32 @@
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;

public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {

private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final SynchronizerConfiguration synchronizerConfiguration;
private final MetricsSystem metricsSystem;

public DownloadReceiptsStep(
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.synchronizerConfiguration = synchronizerConfiguration;
Expand All @@ -76,7 +81,8 @@ public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks
getReceiptsWithPeerTaskSystem(final List<BlockHeader> headers) {
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>();
do {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(headers, new BodyValidator());
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(headers, new BodyValidator(), currentProtocolSpecSupplier);
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult =
peerTaskExecutor.execute(task);
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
Expand Down
Loading

0 comments on commit 86a1f0b

Please sign in to comment.