use crate::{
error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
metrics::MetricsService,
client::{light, Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig},
};
use sc_client_api::{
light::RemoteBlockchain, ForkBlocks, BadBlocks, UsageProvider, ExecutorProvider,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sc_chain_spec::get_extension;
use sp_consensus::{
block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator, Chain},
import_queue::ImportQueue,
};
use jsonrpc_pubsub::manager::SubscriptionManager;
use futures::{
FutureExt, StreamExt,
future::ready,
channel::oneshot,
};
use sc_keystore::LocalKeystore;
use log::{info, warn};
use sc_network::config::{Role, OnDemand};
use sc_network::NetworkService;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, HashFor, Zero, BlockIdTo,
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::sync::Arc;
use wasm_timer::SystemTime;
use sc_telemetry::{
telemetry,
ConnectionMessage,
TelemetryConnectionNotifier,
SUBSTRATE_INFO,
};
use sp_transaction_pool::MaintainedTransactionPool;
use prometheus_endpoint::Registry;
use sc_client_db::{Backend, DatabaseSettings};
use sp_core::traits::{
CodeExecutor,
SpawnNamed,
};
use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::BuildStorage;
use sc_client_api::{
BlockBackend, BlockchainEvents,
backend::StorageProvider,
proof_provider::ProofProvider,
execution_extensions::ExecutionExtensions
};
use sp_blockchain::{HeaderMetadata, HeaderBackend};
pub trait RpcExtensionBuilder {
type Output: sc_rpc::RpcExtension<sc_rpc::Metadata>;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output;
}
impl<F, R> RpcExtensionBuilder for F where
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> R,
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
(*self)(deny, subscription_executor)
}
}
pub struct NoopRpcExtensionBuilder<R>(pub R);
impl<R> RpcExtensionBuilder for NoopRpcExtensionBuilder<R> where
R: Clone + sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
self.0.clone()
}
}
impl<R> From<R> for NoopRpcExtensionBuilder<R> where
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
fn from(e: R) -> NoopRpcExtensionBuilder<R> {
NoopRpcExtensionBuilder(e)
}
}
pub type TFullClient<TBl, TRtApi, TExecDisp> = Client<
TFullBackend<TBl>,
TFullCallExecutor<TBl, TExecDisp>,
TBl,
TRtApi,
>;
pub type TFullBackend<TBl> = sc_client_db::Backend<TBl>;
pub type TFullCallExecutor<TBl, TExecDisp> = crate::client::LocalCallExecutor<
sc_client_db::Backend<TBl>,
NativeExecutor<TExecDisp>,
>;
pub type TLightClient<TBl, TRtApi, TExecDisp> = TLightClientWithBackend<
TBl, TRtApi, TExecDisp, TLightBackend<TBl>
>;
pub type TLightBackend<TBl> = sc_light::Backend<
sc_client_db::light::LightStorage<TBl>,
HashFor<TBl>,
>;
pub type TLightCallExecutor<TBl, TExecDisp> = sc_light::GenesisCallExecutor<
sc_light::Backend<
sc_client_db::light::LightStorage<TBl>,
HashFor<TBl>
>,
crate::client::LocalCallExecutor<
sc_light::Backend<
sc_client_db::light::LightStorage<TBl>,
HashFor<TBl>
>,
NativeExecutor<TExecDisp>
>,
>;
type TFullParts<TBl, TRtApi, TExecDisp> = (
TFullClient<TBl, TRtApi, TExecDisp>,
Arc<TFullBackend<TBl>>,
KeystoreContainer,
TaskManager,
);
type TLightParts<TBl, TRtApi, TExecDisp> = (
Arc<TLightClient<TBl, TRtApi, TExecDisp>>,
Arc<TLightBackend<TBl>>,
KeystoreContainer,
TaskManager,
Arc<OnDemand<TBl>>,
);
pub type TLightBackendWithHash<TBl, THash> = sc_light::Backend<
sc_client_db::light::LightStorage<TBl>,
THash,
>;
pub type TLightClientWithBackend<TBl, TRtApi, TExecDisp, TBackend> = Client<
TBackend,
sc_light::GenesisCallExecutor<
TBackend,
crate::client::LocalCallExecutor<TBackend, NativeExecutor<TExecDisp>>,
>,
TBl,
TRtApi,
>;
trait AsCryptoStoreRef {
fn keystore_ref(&self) -> Arc<dyn CryptoStore>;
fn sync_keystore_ref(&self) -> Arc<dyn SyncCryptoStore>;
}
impl<T> AsCryptoStoreRef for Arc<T> where T: CryptoStore + SyncCryptoStore + 'static {
fn keystore_ref(&self) -> Arc<dyn CryptoStore> {
self.clone()
}
fn sync_keystore_ref(&self) -> Arc<dyn SyncCryptoStore> {
self.clone()
}
}
pub struct KeystoreContainer {
remote: Option<Box<dyn AsCryptoStoreRef>>,
local: Arc<LocalKeystore>,
}
impl KeystoreContainer {
pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
let keystore = Arc::new(match config {
KeystoreConfig::Path { path, password } => LocalKeystore::open(
path.clone(),
password.clone(),
)?,
KeystoreConfig::InMemory => LocalKeystore::in_memory(),
});
Ok(Self{remote: Default::default(), local: keystore})
}
pub fn set_remote_keystore<T>(&mut self, remote: Arc<T>)
where T: CryptoStore + SyncCryptoStore + 'static
{
self.remote = Some(Box::new(remote))
}
pub fn keystore(&self) -> Arc<dyn CryptoStore> {
if let Some(c) = self.remote.as_ref() {
c.keystore_ref()
} else {
self.local.clone()
}
}
pub fn sync_keystore(&self) -> SyncCryptoStorePtr {
if let Some(c) = self.remote.as_ref() {
c.sync_keystore_ref()
} else {
self.local.clone() as SyncCryptoStorePtr
}
}
pub fn local_keystore(&self) -> Option<Arc<LocalKeystore>> {
Some(self.local.clone())
}
}
pub fn new_full_client<TBl, TRtApi, TExecDisp>(
config: &Configuration,
) -> Result<TFullClient<TBl, TRtApi, TExecDisp>, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
{
new_full_parts(config).map(|parts| parts.0)
}
pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
config: &Configuration,
) -> Result<TFullParts<TBl, TRtApi, TExecDisp>, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
};
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
config.max_runtime_instances,
);
let chain_spec = &config.chain_spec;
let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let (client, backend) = {
let db_config = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
state_pruning: config.state_pruning.clone(),
source: config.database.clone(),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
};
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
config.execution_strategies.clone(),
Some(keystore_container.sync_keystore()),
);
new_client(
db_config,
executor,
chain_spec.as_storage_builder(),
fork_blocks,
bad_blocks,
extensions,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
ClientConfig {
offchain_worker_enabled : config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
},
)?
};
Ok((
client,
backend,
keystore_container,
task_manager,
))
}
pub fn new_light_parts<TBl, TRtApi, TExecDisp>(
config: &Configuration,
) -> Result<TLightParts<TBl, TRtApi, TExecDisp>, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
};
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
config.max_runtime_instances,
);
let db_storage = {
let db_settings = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
state_pruning: config.state_pruning.clone(),
source: config.database.clone(),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
};
sc_client_db::light::LightStorage::new(db_settings)?
};
let light_blockchain = sc_light::new_light_blockchain(db_storage);
let fetch_checker = Arc::new(
sc_light::new_fetch_checker::<_, TBl, _>(
light_blockchain.clone(),
executor.clone(),
Box::new(task_manager.spawn_handle()),
),
);
let on_demand = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
let backend = sc_light::new_light_backend(light_blockchain);
let client = Arc::new(light::new_light(
backend.clone(),
config.chain_spec.as_storage_builder(),
executor,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
)?);
Ok((client, backend, keystore_container, task_manager, on_demand))
}
pub fn new_client<E, Block, RA>(
settings: DatabaseSettings,
executor: E,
genesis_storage: &dyn BuildStorage,
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
execution_extensions: ExecutionExtensions<Block>,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
config: ClientConfig,
) -> Result<(
crate::client::Client<
Backend<Block>,
crate::client::LocalCallExecutor<Backend<Block>, E>,
Block,
RA,
>,
Arc<Backend<Block>>,
),
sp_blockchain::Error,
>
where
Block: BlockT,
E: CodeExecutor + RuntimeInfo,
{
const CANONICALIZATION_DELAY: u64 = 4096;
let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
let executor = crate::client::LocalCallExecutor::new(backend.clone(), executor, spawn_handle, config.clone())?;
Ok((
crate::client::Client::new(
backend.clone(),
executor,
genesis_storage,
fork_blocks,
bad_blocks,
execution_extensions,
prometheus_registry,
config,
)?,
backend,
))
}
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub config: Configuration,
pub client: Arc<TCl>,
pub backend: Arc<Backend>,
pub task_manager: &'a mut TaskManager,
pub keystore: SyncCryptoStorePtr,
pub on_demand: Option<Arc<OnDemand<TBl>>>,
pub transaction_pool: Arc<TExPool>,
pub rpc_extensions_builder: Box<dyn RpcExtensionBuilder<Output = TRpc> + Send>,
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
pub network_status_sinks: NetworkStatusSinks<TBl>,
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
}
pub fn build_offchain_workers<TBl, TBackend, TCl>(
config: &Configuration,
backend: Arc<TBackend>,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
) -> Option<Arc<sc_offchain::OffchainWorkers<TCl, TBackend::OffchainStorage, TBl>>>
where
TBl: BlockT, TBackend: sc_client_api::Backend<TBl>,
<TBackend as sc_client_api::Backend<TBl>>::OffchainStorage: 'static,
TCl: Send + Sync + ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api: sc_offchain::OffchainWorkerApi<TBl>,
{
let offchain_workers = match backend.offchain_storage() {
Some(db) => {
Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db)))
},
None => {
warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
};
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
Clone::clone(&spawn_handle),
network.clone(),
)
);
}
offchain_workers
}
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<(RpcHandlers, Option<TelemetryConnectionNotifier>), Error>
where
TCl: ProvideRuntimeApi<TBl> + HeaderMetadata<TBl, Error=sp_blockchain::Error> + Chain<TBl> +
BlockBackend<TBl> + BlockIdTo<TBl, Error=sp_blockchain::Error> + ProofProvider<TBl> +
HeaderBackend<TBl> + BlockchainEvents<TBl> + ExecutorProvider<TBl> + UsageProvider<TBl> +
StorageProvider<TBl, TBackend> + CallApiAt<TBl, Error=sp_blockchain::Error> +
Send + 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api:
sp_api::Metadata<TBl> +
sc_offchain::OffchainWorkerApi<TBl> +
sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl> +
sp_session::SessionKeys<TBl> +
sp_api::ApiErrorExt<Error = sp_blockchain::Error> +
sp_api::ApiExt<TBl, StateBackend = TBackend::State>,
TBl: BlockT,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> +
MallocSizeOfWasm + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>
{
let SpawnTasksParams {
mut config,
task_manager,
client,
on_demand,
backend,
keystore,
transaction_pool,
rpc_extensions_builder,
remote_blockchain,
network,
network_status_sinks,
system_rpc_tx,
} = params;
let chain_info = client.usage_info().chain;
sp_session::generate_initial_session_keys(
client.clone(),
&BlockId::Hash(chain_info.best_hash),
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
)?;
let telemetry_connection_notifier = init_telemetry(
&mut config,
network.clone(),
client.clone(),
);
info!("📦 Highest known block at #{}", chain_info.best_number);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn(
"txpool-notifications",
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
spawn_handle.spawn(
"on-transaction-imported",
transaction_notifications(transaction_pool.clone(), network.clone()),
);
let metrics_service = if let Some(PrometheusConfig { port, registry }) =
config.prometheus_config.clone()
{
let metrics = MetricsService::with_prometheus(®istry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
prometheus_endpoint::init_prometheus(port, registry).map(drop)
);
metrics
} else {
MetricsService::new()
};
spawn_handle.spawn("telemetry-periodic-send",
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network_status_sinks.clone()
)
);
let gen_handler = |
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware
| gen_handler(
deny_unsafe, rpc_middleware, &config, task_manager.spawn_handle(),
client.clone(), transaction_pool.clone(), keystore.clone(),
on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?;
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser")
).into()));
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.status.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone()));
Ok((rpc_handlers, telemetry_connection_notifier))
}
async fn transaction_notifications<TBl, TExPool>(
transaction_pool: Arc<TExPool>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
)
where
TBl: BlockT,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
transaction_pool
.import_notification_stream()
.for_each(move |hash| {
network.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
ready(())
})
.await;
}
fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
config: &mut Configuration,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
client: Arc<TCl>,
) -> Option<TelemetryConnectionNotifier> {
let telemetry_span = config.telemetry_span.clone()?;
let endpoints = config.telemetry_endpoints.clone()?;
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
name: config.network.node_name.to_owned(),
implementation: config.impl_name.to_owned(),
version: config.impl_version.to_owned(),
config: String::new(),
chain: config.chain_spec.name().to_owned(),
genesis_hash: format!("{:?}", genesis_hash),
authority: config.role.is_authority(),
startup_time: SystemTime::UNIX_EPOCH.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0).to_string(),
network_id: network.local_peer_id().to_base58(),
};
config.telemetry_handle
.as_mut()
.map(|handle| handle.start_telemetry(
telemetry_span,
endpoints,
connection_message,
))
}
fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
keystore: SyncCryptoStorePtr,
on_demand: Option<Arc<OnDemand<TBl>>>,
remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>
) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + HeaderBackend<TBl> +
HeaderMetadata<TBl, Error=sp_blockchain::Error> + ExecutorProvider<TBl> +
CallApiAt<TBl, Error=sp_blockchain::Error> + ProofProvider<TBl> +
StorageProvider<TBl, TBackend> + BlockBackend<TBl> + Send + Sync + 'static,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBackend: sc_client_api::backend::Backend<TBl> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
<TCl as ProvideRuntimeApi<TBl>>::Api:
sp_session::SessionKeys<TBl> +
sp_api::Metadata<TBl, Error = sp_blockchain::Error>,
{
use sc_rpc::{chain, state, author, system, offchain};
let system_info = sc_rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.clone(),
impl_version: config.impl_version.clone(),
properties: config.chain_spec.properties(),
chain_type: config.chain_spec.chain_type(),
};
let task_executor = sc_rpc::SubscriptionTaskExecutor::new(spawn_handle);
let subscriptions = SubscriptionManager::new(Arc::new(task_executor.clone()));
let (chain, state, child_state) = if let (Some(remote_blockchain), Some(on_demand)) =
(remote_blockchain, on_demand) {
let chain = sc_rpc::chain::new_light(
client.clone(),
subscriptions.clone(),
remote_blockchain.clone(),
on_demand.clone(),
);
let (state, child_state) = sc_rpc::state::new_light(
client.clone(),
subscriptions.clone(),
remote_blockchain.clone(),
on_demand,
deny_unsafe,
);
(chain, state, child_state)
} else {
let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone());
let (state, child_state) = sc_rpc::state::new_full(
client.clone(),
subscriptions.clone(),
deny_unsafe,
);
(chain, state, child_state)
};
let author = sc_rpc::author::Author::new(
client,
transaction_pool,
subscriptions,
keystore,
deny_unsafe,
);
let system = system::System::new(system_info, system_rpc_tx, deny_unsafe);
let maybe_offchain_rpc = offchain_storage.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
offchain::OffchainApi::to_delegate(offchain)
});
sc_rpc_server::rpc_handler(
(
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
),
rpc_middleware
)
}
pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
pub config: &'a Configuration,
pub client: Arc<TCl>,
pub transaction_pool: Arc<TExPool>,
pub spawn_handle: SpawnTaskHandle,
pub import_queue: TImpQu,
pub on_demand: Option<Arc<OnDemand<TBl>>>,
pub block_announce_validator_builder: Option<Box<
dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send
>>,
}
pub fn build_network<TBl, TExPool, TImpQu, TCl>(
params: BuildNetworkParams<TBl, TExPool, TImpQu, TCl>
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
NetworkStatusSinks<TBl>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
NetworkStarter,
),
Error
>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + HeaderMetadata<TBl, Error=sp_blockchain::Error> + Chain<TBl> +
BlockBackend<TBl> + BlockIdTo<TBl, Error=sp_blockchain::Error> + ProofProvider<TBl> +
HeaderBackend<TBl> + BlockchainEvents<TBl> + 'static,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TImpQu: ImportQueue<TBl> + 'static,
{
let BuildNetworkParams {
config, client, transaction_pool, spawn_handle, import_queue, on_demand,
block_announce_validator_builder,
} = params;
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool,
client: client.clone(),
});
let protocol_id = config.protocol_id();
let block_announce_validator = if let Some(f) = block_announce_validator_builder {
f(client.clone())
} else {
Box::new(DefaultBlockAnnounceValidator)
};
let block_request_protocol_config = {
if matches!(config.role, Role::Light) {
block_request_handler::generate_protocol_config(&protocol_id)
} else {
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
client.clone(),
);
spawn_handle.spawn("block_request_handler", handler.run());
protocol_config
}
};
let light_client_request_protocol_config = {
if matches!(config.role, Role::Light) {
light_client_requests::generate_protocol_config(&protocol_id)
} else {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
client.clone(),
);
spawn_handle.spawn("light_client_request_handler", handler.run());
protocol_config
}
};
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
}))
},
network_config: config.network.clone(),
chain: client.clone(),
on_demand: on_demand,
transaction_pool: transaction_pool_adapter as _,
import_queue: Box::new(import_queue),
protocol_id,
block_announce_validator,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_request_protocol_config,
light_client_request_protocol_config,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = NetworkStatusSinks::new();
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let future = build_network_future(
config.role.clone(),
network_mut,
client,
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
);
let (network_start_tx, network_start_rx) = oneshot::channel();
spawn_handle.spawn_blocking("network-worker", async move {
if network_start_rx.await.is_err() {
debug_assert!(false);
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
);
return;
}
future.await
});
Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
}
#[must_use]
pub struct NetworkStarter(oneshot::Sender<()>);
impl NetworkStarter {
pub fn start_network(self) {
let _ = self.0.send(());
}
}