Skip to content

Commit

Permalink
refactor notification protocols (#2269)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
Co-authored-by: kamilsa <[email protected]>
  • Loading branch information
turuslan and kamilsa authored Nov 18, 2024
1 parent 153ed85 commit c3d19b6
Show file tree
Hide file tree
Showing 91 changed files with 2,217 additions and 3,003 deletions.
4 changes: 1 addition & 3 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ CheckOptions:
value: '4294967295'
- key: hicpp-member-init.IgnoreArrays
value: '0'
- key: hicpp-move-const-arg.CheckTriviallyCopyableMove
value: '1'
- key: hicpp-multiway-paths-covered.WarnOnMissingElse
value: '0'
- key: hicpp-named-parameter.IgnoreFailedSplit
Expand Down Expand Up @@ -261,7 +259,7 @@ CheckOptions:
- key: performance-inefficient-vector-operation.VectorLikeClasses
value: '::std::vector'
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: '1'
value: '0'
- key: performance-move-constructor-init.IncludeStyle
value: google
- key: performance-type-promotion-in-math-fn.IncludeStyle
Expand Down
6 changes: 3 additions & 3 deletions core/application/chain_spec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <set>

#include <libp2p/peer/peer_info.hpp>
#include <libp2p/multi/multiaddress.hpp>

#include "crypto/ed25519_types.hpp"
#include "crypto/sr25519_types.hpp"
Expand Down Expand Up @@ -38,8 +38,8 @@ namespace kagome::application {
virtual const std::vector<libp2p::multi::Multiaddress> &bootNodes()
const = 0;

virtual const std::vector<std::pair<std::string, size_t>> &
telemetryEndpoints() const = 0;
virtual const std::vector<std::pair<std::string, size_t>>
&telemetryEndpoints() const = 0;

virtual const std::string &protocolId() const = 0;

Expand Down
1 change: 1 addition & 0 deletions core/application/impl/chain_spec_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <boost/property_tree/json_parser.hpp>
#include <charconv>
#include <libp2p/multi/multiaddress.hpp>
#include <libp2p/peer/peer_id.hpp>
#include <sstream>
#include <system_error>

Expand Down
24 changes: 16 additions & 8 deletions core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/buffer_view.hpp"
#include "common/bytestr.hpp"
#include "crypto/sha/sha256.hpp"
#include "network/impl/protocols/parachain.hpp"
#include "utils/retain_if.hpp"

OUTCOME_CPP_DEFINE_CATEGORY(kagome::authority_discovery, QueryImpl::Error, e) {
Expand Down Expand Up @@ -38,6 +39,7 @@ namespace kagome::authority_discovery {
std::shared_ptr<application::AppStateManager> app_state_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<runtime::AuthorityDiscoveryApi> authority_discovery_api,
LazySPtr<network::ValidationProtocol> validation_protocol,
std::shared_ptr<crypto::KeyStore> key_store,
std::shared_ptr<crypto::Sr25519Provider> sr_crypto_provider,
std::shared_ptr<libp2p::crypto::CryptoProvider> libp2p_crypto_provider,
Expand All @@ -47,6 +49,7 @@ namespace kagome::authority_discovery {
std::shared_ptr<libp2p::basic::Scheduler> scheduler)
: block_tree_{std::move(block_tree)},
authority_discovery_api_{std::move(authority_discovery_api)},
validation_protocol_{std::move(validation_protocol)},
key_store_{std::move(key_store)},
sr_crypto_provider_{std::move(sr_crypto_provider)},
libp2p_crypto_provider_{std::move(libp2p_crypto_provider)},
Expand Down Expand Up @@ -158,6 +161,7 @@ namespace kagome::authority_discovery {
++it;
} else {
it = auth_to_peer_cache_.erase(it);
validation_protocol_.get()->reserve(it->second.peer.id, false);
}
}
for (auto it = peer_to_auth_cache_.begin();
Expand All @@ -166,6 +170,7 @@ namespace kagome::authority_discovery {
++it;
} else {
it = peer_to_auth_cache_.erase(it);
validation_protocol_.get()->reserve(it->first, false);
}
}
std::shuffle(authorities.begin(), authorities.end(), random_);
Expand Down Expand Up @@ -198,7 +203,8 @@ namespace kagome::authority_discovery {
hash = common::Buffer{crypto::sha256(authority)},
authority] {
if (auto self = wp.lock()) {
SL_DEBUG(self->log_, "start lookup({})", common::hex_lower(authority));
SL_DEBUG(
self->log_, "start lookup({})", common::hex_lower(authority));
std::ignore = self->kademlia_.get()->getValue(
hash, [=](const outcome::result<std::vector<uint8_t>> &res) {
if (auto self = wp.lock()) {
Expand All @@ -216,9 +222,9 @@ namespace kagome::authority_discovery {
const primitives::AuthorityDiscoveryId &authority,
outcome::result<std::vector<uint8_t>> _res) {
SL_TRACE(log_,
"lookup : add addresses for authority {}, _res {}",
common::hex_lower(authority),
_res.has_value() ? "ok" : "error: " + _res.error().message());
"lookup : add addresses for authority {}, _res {}",
common::hex_lower(authority),
_res.has_value() ? "ok" : "error: " + _res.error().message());
OUTCOME_TRY(signed_record_pb, _res);
auto it = auth_to_peer_cache_.find(authority);
if (it != auth_to_peer_cache_.end()
Expand Down Expand Up @@ -271,9 +277,9 @@ namespace kagome::authority_discovery {
libp2p::peer::PeerInfo peer{.id = std::move(peer_id)};
auto peer_id_str = peer.id.toBase58();
SL_TRACE(log_,
"lookup: adding {} addresses for authority {}",
record.addresses().size(),
authority);
"lookup: adding {} addresses for authority {}",
record.addresses().size(),
authority);
for (auto &pb : record.addresses()) {
OUTCOME_TRY(address, libp2p::multi::Multiaddress::create(str2byte(pb)));
auto id = address.getPeerId();
Expand Down Expand Up @@ -316,9 +322,11 @@ namespace kagome::authority_discovery {
Authority{
.raw = std::move(signed_record_pb),
.time = time,
.peer = std::move(peer),
.peer = peer,
});

validation_protocol_.get()->reserve(peer.id, true);

return outcome::success();
}
} // namespace kagome::authority_discovery
6 changes: 6 additions & 0 deletions core/authority_discovery/query/query_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
#include <mutex>
#include <random>

namespace kagome::network {
class ValidationProtocol;
} // namespace kagome::network

namespace kagome::authority_discovery {
class QueryImpl : public Query,
public libp2p::protocol::kademlia::Validator,
Expand All @@ -43,6 +47,7 @@ namespace kagome::authority_discovery {
std::shared_ptr<application::AppStateManager> app_state_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<runtime::AuthorityDiscoveryApi> authority_discovery_api,
LazySPtr<network::ValidationProtocol> validation_protocol,
std::shared_ptr<crypto::KeyStore> key_store,
std::shared_ptr<crypto::Sr25519Provider> sr_crypto_provider,
std::shared_ptr<libp2p::crypto::CryptoProvider> libp2p_crypto_provider,
Expand Down Expand Up @@ -85,6 +90,7 @@ namespace kagome::authority_discovery {

std::shared_ptr<blockchain::BlockTree> block_tree_;
std::shared_ptr<runtime::AuthorityDiscoveryApi> authority_discovery_api_;
LazySPtr<network::ValidationProtocol> validation_protocol_;
std::shared_ptr<crypto::KeyStore> key_store_;
std::shared_ptr<crypto::Sr25519Provider> sr_crypto_provider_;
std::shared_ptr<libp2p::crypto::CryptoProvider> libp2p_crypto_provider_;
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain/genesis_block_hash.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#pragma once

#include "blockchain/block_tree.hpp"
#include "primitives/common.hpp"

namespace kagome::blockchain {

Expand Down
2 changes: 1 addition & 1 deletion core/consensus/babe/impl/babe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include "offchain/offchain_worker_pool.hpp"
#include "parachain/availability/bitfield/store.hpp"
#include "parachain/parachain_inherent_data.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "parachain/validator/backed_candidates_source.hpp"
#include "primitives/inherent_data.hpp"
#include "runtime/runtime_api/babe_api.hpp"
#include "runtime/runtime_api/offchain_worker_api.hpp"
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/babe/impl/babe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace kagome::offchain {

namespace kagome::parachain {
class BitfieldStore;
struct ParachainProcessorImpl;
class ParachainProcessorImpl;
struct BackedCandidatesSource;
} // namespace kagome::parachain

Expand Down
20 changes: 4 additions & 16 deletions core/consensus/babe/impl/babe_block_validator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "runtime/runtime_api/babe_api.hpp"
#include "runtime/runtime_api/offchain_worker_api.hpp"
#include "threshold_util.hpp"
#include "utils/weak_macro.hpp"

OUTCOME_CPP_DEFINE_CATEGORY(kagome::consensus::babe,
BabeBlockValidatorImpl::ValidationError,
Expand Down Expand Up @@ -77,22 +78,9 @@ namespace kagome::consensus::babe {

void BabeBlockValidatorImpl::prepare() {
sync_state_observer_ =
std::make_shared<primitives::events::SyncStateEventSubscriber>(
sync_state_observable_, false);
sync_state_observer_->subscribe(
sync_state_observer_->generateSubscriptionSetId(),
primitives::events::SyncStateEventType::kSyncState);
sync_state_observer_->setCallback(
[wp{weak_from_this()}](
auto /*set_id*/,
bool &synchronized,
auto /*event_type*/,
const primitives::events::SyncStateEventParams &event) mutable {
if (auto self = wp.lock()) {
if (event == consensus::SyncState::SYNCHRONIZED) {
self->was_synchronized_ = true;
}
}
primitives::events::onSync(sync_state_observable_, [WEAK_SELF] {
WEAK_LOCK(self);
self->was_synchronized_ = true;
});
}

Expand Down
2 changes: 1 addition & 1 deletion core/consensus/babe/impl/babe_block_validator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace kagome::consensus::babe {
primitives::events::SyncStateSubscriptionEnginePtr sync_state_observable_;

bool was_synchronized_ = false;
primitives::events::SyncStateEventSubscriberPtr sync_state_observer_;
std::shared_ptr<void> sync_state_observer_;
};

} // namespace kagome::consensus::babe
Expand Down
20 changes: 8 additions & 12 deletions core/consensus/beefy/impl/beefy_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,14 @@
namespace kagome::network {
constexpr std::chrono::minutes kRebroadcastAfter{1};

namespace {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
metrics::GaugeHelper metric_validator_set_id{
"kagome_beefy_validator_set_id",
"Current BEEFY active validator set id.",
};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
metrics::GaugeHelper metric_finalized{
"kagome_beefy_best_block",
"Best block finalized by BEEFY",
};
} // namespace
static const metrics::GaugeHelper metric_validator_set_id{
"kagome_beefy_validator_set_id",
"Current BEEFY active validator set id.",
};
static const metrics::GaugeHelper metric_finalized{
"kagome_beefy_best_block",
"Best block finalized by BEEFY",
};

BeefyImpl::BeefyImpl(
std::shared_ptr<application::AppStateManager> app_state_manager,
Expand Down
15 changes: 7 additions & 8 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <libp2p/basic/scheduler/scheduler_impl.hpp>

#include "application/app_state_manager.hpp"
#include "application/chain_spec.hpp"
#include "authority_discovery/query/query.hpp"
#include "blockchain/block_header_repository.hpp"
#include "common/main_thread_pool.hpp"
Expand All @@ -35,6 +36,7 @@
#include "runtime/runtime_api/parachain_host.hpp"
#include "utils/pool_handler_ready_make.hpp"
#include "utils/tuple_hash.hpp"
#include "utils/weak_macro.hpp"

namespace kagome::dispute {

Expand Down Expand Up @@ -238,15 +240,12 @@ namespace kagome::dispute {
active_heads_.insert(leaves.begin(), leaves.end());

// subscribe to leaves update
my_view_sub_ = std::make_shared<network::PeerView::MyViewSubscriber>(
peer_view_->getMyViewObservable(), false);
primitives::events::subscribe(
*my_view_sub_,
my_view_sub_ = primitives::events::subscribe(
peer_view_->getMyViewObservable(),
network::PeerView::EventType::kViewUpdated,
[wptr{weak_from_this()}](const network::ExView &event) {
if (auto self = wptr.lock()) {
self->on_active_leaves_update(event);
}
[WEAK_SELF](const network::ExView &event) {
WEAK_LOCK(self);
self->on_active_leaves_update(event);
});

// subscribe to finalization
Expand Down
2 changes: 1 addition & 1 deletion core/dispute_coordinator/impl/dispute_coordinator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace kagome::network {
} // namespace kagome::network

namespace kagome::parachain {
struct ApprovalDistribution;
class ApprovalDistribution;
class Recovery;
class Pvf;
} // namespace kagome::parachain
Expand Down
11 changes: 5 additions & 6 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include "injector/application_injector.hpp"

#define BOOST_DI_CFG_DIAGNOSTICS_LEVEL 2
#define BOOST_DI_CFG_CTOR_LIMIT_SIZE \
32 // TODO(Harrm): #2104 check how it influences on compilation time

#include "injector/application_injector.hpp"

#include <rocksdb/filter_policy.h>
#include <rocksdb/table.h>
#include <boost/di.hpp>
Expand Down Expand Up @@ -119,10 +119,11 @@
#include "network/impl/peer_manager_impl.hpp"
#include "network/impl/protocols/beefy_justification_protocol.hpp"
#include "network/impl/protocols/beefy_protocol_impl.hpp"
#include "network/impl/protocols/block_announce_protocol.hpp"
#include "network/impl/protocols/fetch_attested_candidate.hpp"
#include "network/impl/protocols/grandpa_protocol.hpp"
#include "network/impl/protocols/light.hpp"
#include "network/impl/protocols/parachain_protocols.hpp"
#include "network/impl/protocols/parachain.hpp"
#include "network/impl/protocols/protocol_fetch_available_data.hpp"
#include "network/impl/protocols/protocol_fetch_chunk.hpp"
#include "network/impl/protocols/protocol_fetch_chunk_obsolete.hpp"
Expand Down Expand Up @@ -328,7 +329,6 @@ namespace {
libp2p::protocol::kademlia::Config kademlia_config;
kademlia_config.protocols =
network::make_protocols("/{}/kad", genesis, chain_spec);
kademlia_config.maxBucketSize = 1000;
kademlia_config.randomWalk.enabled = false;
kademlia_config.valueLookupsQuorum = 4;

Expand Down Expand Up @@ -757,7 +757,6 @@ namespace {
di::bind<crypto::Hasher>.template to<crypto::HasherImpl>(),
di::bind<crypto::Sr25519Provider>.template to<crypto::Sr25519ProviderImpl>(),
di::bind<crypto::VRFProvider>.template to<crypto::VRFProviderImpl>(),
di::bind<network::StreamEngine>.template to<network::StreamEngine>(),
di::bind<network::ReputationRepository>.template to<network::ReputationRepositoryImpl>(),
di::bind<crypto::Bip39Provider>.template to<crypto::Bip39ProviderImpl>(),
di::bind<crypto::Pbkdf2Provider>.template to<crypto::Pbkdf2ProviderImpl>(),
Expand Down Expand Up @@ -935,7 +934,7 @@ namespace kagome::injector {
KagomeNodeInjector::KagomeNodeInjector(
sptr<application::AppConfiguration> app_config)
: pimpl_{std::make_unique<KagomeNodeInjectorImpl>(
makeKagomeNodeInjector(std::move(app_config)))} {}
makeKagomeNodeInjector(std::move(app_config)))} {}

sptr<application::AppConfiguration> KagomeNodeInjector::injectAppConfig() {
return pimpl_->injector_
Expand Down
6 changes: 3 additions & 3 deletions core/injector/application_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ namespace kagome {

namespace parachain {
class ParachainObserver;
struct ParachainProcessorImpl;
struct ApprovalDistribution;
class ParachainProcessorImpl;
class ApprovalDistribution;

namespace statement_distribution {
struct StatementDistribution;
class StatementDistribution;
}
} // namespace parachain

Expand Down
2 changes: 1 addition & 1 deletion core/metrics/histogram_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace kagome::metrics {
metric_ = registry_->registerGaugeMetric(name);
}

auto *operator->() {
auto *operator->() const {
return metric_;
}

Expand Down
Loading

0 comments on commit c3d19b6

Please sign in to comment.