Skip to content

Commit c8b8169

Browse files
committed
node/launcher: extract build_subgraph_registrar helper from run
1 parent b22dbbc commit c8b8169

File tree

1 file changed

+99
-63
lines changed

1 file changed

+99
-63
lines changed

node/src/launcher.rs

+99-63
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use graph::log::logger;
1919
use graph::prelude::*;
2020
use graph::prometheus::Registry;
2121
use graph::url::Url;
22-
use graph_core::polling_monitor::{arweave_service, ipfs_service};
22+
use graph_core::polling_monitor::{arweave_service, ipfs_service, ArweaveService, IpfsService};
2323
use graph_core::{
2424
SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager,
2525
SubgraphRegistrar as IpfsSubgraphRegistrar,
@@ -31,7 +31,7 @@ use graph_server_json_rpc::JsonRpcServer;
3131
use graph_server_metrics::PrometheusMetricsServer;
3232
use graph_store_postgres::{
3333
register_jobs as register_store_jobs, ChainHeadUpdateListener, ConnectionPool,
34-
NotificationSender, Store, SubscriptionManager,
34+
NotificationSender, Store, SubgraphStore, SubscriptionManager,
3535
};
3636
use graphman_server::GraphmanServer;
3737
use graphman_server::GraphmanServerConfig;
@@ -259,6 +259,94 @@ fn deploy_subgraph_from_flag(
259259
);
260260
}
261261

262+
fn build_subgraph_registrar(
263+
metrics_registry: Arc<MetricsRegistry>,
264+
network_store: &Arc<Store>,
265+
logger_factory: &LoggerFactory,
266+
env_vars: &Arc<EnvVars>,
267+
blockchain_map: Arc<BlockchainMap>,
268+
node_id: NodeId,
269+
subgraph_settings: Settings,
270+
link_resolver: Arc<IpfsResolver>,
271+
subscription_manager: Arc<SubscriptionManager>,
272+
arweave_service: ArweaveService,
273+
ipfs_service: IpfsService,
274+
) -> Arc<
275+
IpfsSubgraphRegistrar<
276+
IpfsSubgraphAssignmentProvider<SubgraphInstanceManager<SubgraphStore>>,
277+
SubgraphStore,
278+
SubscriptionManager,
279+
>,
280+
> {
281+
let static_filters = ENV_VARS.experimental_static_filters;
282+
let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone()));
283+
284+
let subgraph_instance_manager = SubgraphInstanceManager::new(
285+
&logger_factory,
286+
env_vars.cheap_clone(),
287+
network_store.subgraph_store(),
288+
blockchain_map.cheap_clone(),
289+
sg_count.cheap_clone(),
290+
metrics_registry.clone(),
291+
link_resolver.clone(),
292+
ipfs_service,
293+
arweave_service,
294+
static_filters,
295+
);
296+
297+
// Create IPFS-based subgraph provider
298+
let subgraph_provider = IpfsSubgraphAssignmentProvider::new(
299+
&logger_factory,
300+
link_resolver.clone(),
301+
subgraph_instance_manager,
302+
sg_count,
303+
);
304+
305+
// Check version switching mode environment variable
306+
let version_switching_mode = ENV_VARS.subgraph_version_switching_mode;
307+
308+
// Create named subgraph provider for resolving subgraph name->ID mappings
309+
let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new(
310+
&logger_factory,
311+
link_resolver,
312+
Arc::new(subgraph_provider),
313+
network_store.subgraph_store(),
314+
subscription_manager,
315+
blockchain_map,
316+
node_id.clone(),
317+
version_switching_mode,
318+
Arc::new(subgraph_settings),
319+
));
320+
321+
subgraph_registrar
322+
}
323+
324+
fn build_graphql_server(
325+
config: &Config,
326+
logger: &Logger,
327+
expensive_queries: Vec<Arc<q::Document>>,
328+
metrics_registry: Arc<MetricsRegistry>,
329+
network_store: &Arc<Store>,
330+
logger_factory: &LoggerFactory,
331+
) -> GraphQLQueryServer<GraphQlRunner<Store>> {
332+
let shards: Vec<_> = config.stores.keys().cloned().collect();
333+
let load_manager = Arc::new(LoadManager::new(
334+
&logger,
335+
shards,
336+
expensive_queries,
337+
metrics_registry.clone(),
338+
));
339+
let graphql_runner = Arc::new(GraphQlRunner::new(
340+
&logger,
341+
network_store.clone(),
342+
load_manager,
343+
metrics_registry,
344+
));
345+
let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone());
346+
347+
graphql_server
348+
}
349+
262350
pub async fn run(opt: Opt, env_vars: Arc<EnvVars>) {
263351
env_logger::init();
264352
// Set up logger
@@ -428,46 +516,20 @@ pub async fn run(opt: Opt, env_vars: Arc<EnvVars>) {
428516
.await;
429517
}
430518

431-
let static_filters = ENV_VARS.experimental_static_filters;
432-
433-
let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone()));
434-
435-
let subgraph_instance_manager = SubgraphInstanceManager::new(
436-
&logger_factory,
437-
env_vars.cheap_clone(),
438-
network_store.subgraph_store(),
439-
blockchain_map.cheap_clone(),
440-
sg_count.cheap_clone(),
519+
let subgraph_registrar = build_subgraph_registrar(
441520
metrics_registry.clone(),
442-
link_resolver.clone(),
443-
ipfs_service,
444-
arweave_service,
445-
static_filters,
446-
);
447-
448-
// Create IPFS-based subgraph provider
449-
let subgraph_provider = IpfsSubgraphAssignmentProvider::new(
521+
&network_store,
450522
&logger_factory,
523+
&env_vars,
524+
blockchain_map.clone(),
525+
node_id.clone(),
526+
subgraph_settings,
451527
link_resolver.clone(),
452-
subgraph_instance_manager,
453-
sg_count,
528+
subscription_manager,
529+
arweave_service,
530+
ipfs_service,
454531
);
455532

456-
// Check version switching mode environment variable
457-
let version_switching_mode = ENV_VARS.subgraph_version_switching_mode;
458-
459-
// Create named subgraph provider for resolving subgraph name->ID mappings
460-
let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new(
461-
&logger_factory,
462-
link_resolver,
463-
Arc::new(subgraph_provider),
464-
network_store.subgraph_store(),
465-
subscription_manager,
466-
blockchain_map,
467-
node_id.clone(),
468-
version_switching_mode,
469-
Arc::new(subgraph_settings),
470-
));
471533
graph::spawn(
472534
subgraph_registrar
473535
.start()
@@ -515,32 +577,6 @@ pub async fn run(opt: Opt, env_vars: Arc<EnvVars>) {
515577
graph::futures03::future::pending::<()>().await;
516578
}
517579

518-
fn build_graphql_server(
519-
config: &Config,
520-
logger: &Logger,
521-
expensive_queries: Vec<Arc<q::Document>>,
522-
metrics_registry: Arc<MetricsRegistry>,
523-
network_store: &Arc<Store>,
524-
logger_factory: &LoggerFactory,
525-
) -> GraphQLQueryServer<GraphQlRunner<Store>> {
526-
let shards: Vec<_> = config.stores.keys().cloned().collect();
527-
let load_manager = Arc::new(LoadManager::new(
528-
&logger,
529-
shards,
530-
expensive_queries,
531-
metrics_registry.clone(),
532-
));
533-
let graphql_runner = Arc::new(GraphQlRunner::new(
534-
&logger,
535-
network_store.clone(),
536-
load_manager,
537-
metrics_registry,
538-
));
539-
let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone());
540-
541-
graphql_server
542-
}
543-
544580
fn spawn_contention_checker(logger: Logger) {
545581
// Periodically check for contention in the tokio threadpool. First spawn a
546582
// task that simply responds to "ping" requests. Then spawn a separate

0 commit comments

Comments
 (0)