use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::prelude::*;
use finality_grandpa::{
BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet
};
use log::{debug, info, warn};
use sp_keystore::SyncCryptoStorePtr;
use sp_consensus::SelectChain;
use sc_client_api::backend::Backend;
use sp_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::traits::{NumberFor, Block as BlockT};
use sp_blockchain::HeaderMetadata;
use crate::{
global_communication, CommandOrError, CommunicationIn, Config, environment,
LinkHalf, Error, aux_schema::PersistentData, VoterCommand, VoterSetState,
};
use crate::authorities::SharedAuthoritySet;
use crate::communication::{Network as NetworkT, NetworkBridge};
use crate::notification::GrandpaJustificationSender;
use sp_finality_grandpa::AuthorityId;
use std::marker::{PhantomData, Unpin};
struct ObserverChain<'a, Block: BlockT, Client> {
client: &'a Arc<Client>,
_phantom: PhantomData<Block>,
}
impl<'a, Block, Client> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
for ObserverChain<'a, Block, Client> where
Block: BlockT,
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
NumberFor<Block>: BlockNumberOps,
{
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
environment::ancestry(&self.client, base, block)
}
fn best_chain_containing(&self, _block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
None
}
}
fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
client: &Arc<Client>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
voters: &Arc<VoterSet<AuthorityId>>,
justification_sender: &Option<GrandpaJustificationSender<Block>>,
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
where
NumberFor<Block>: BlockNumberOps,
S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
F: Fn(u64),
BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>,
{
let authority_set = authority_set.clone();
let client = client.clone();
let voters = voters.clone();
let justification_sender = justification_sender.clone();
let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| {
let (round, commit, callback) = match global {
voter::CommunicationIn::Commit(round, commit, callback) => {
let commit = finality_grandpa::Commit::from(commit);
(round, commit, callback)
},
voter::CommunicationIn::CatchUp(..) => {
return future::ok(last_finalized_number);
},
};
if commit.target_number <= last_finalized_number {
return future::ok(last_finalized_number);
}
let validation_result = match finality_grandpa::validate_commit(
&commit,
&voters,
&ObserverChain { client: &client, _phantom: PhantomData },
) {
Ok(r) => r,
Err(e) => return future::err(e.into()),
};
if validation_result.ghost().is_some() {
let finalized_hash = commit.target_hash;
let finalized_number = commit.target_number;
match environment::finalize_block(
client.clone(),
&authority_set,
None,
finalized_hash,
finalized_number,
(round, commit).into(),
false,
justification_sender.as_ref(),
) {
Ok(_) => {},
Err(e) => return future::err(e),
};
note_round(round + 1);
finality_grandpa::process_commit_validation_result(validation_result, callback);
future::ok(finalized_number)
} else {
debug!(target: "afg", "Received invalid commit: ({:?}, {:?})", round, commit);
finality_grandpa::process_commit_validation_result(validation_result, callback);
future::ok(last_finalized_number)
}
});
observer.map_ok(|_| ())
}
#[allow(unused)]
pub fn run_grandpa_observer<BE, Block: BlockT, Client, N, SC>(
config: Config,
link: LinkHalf<Block, Client, SC>,
network: N,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static>
where
BE: Backend<Block> + Unpin + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
Client: crate::ClientForGrandpa<Block, BE> + 'static,
{
let LinkHalf {
client,
select_chain: _,
persistent_data,
voter_commands_rx,
justification_sender,
..
} = link;
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
None,
);
let observer_work = ObserverWork::new(
client,
network,
persistent_data,
config.keystore,
voter_commands_rx,
Some(justification_sender),
);
let observer_work = observer_work
.map_ok(|_| ())
.map_err(|e| {
warn!("GRANDPA Observer failed: {:?}", e);
});
Ok(observer_work.map(drop))
}
#[must_use]
struct ObserverWork<B: BlockT, BE, Client, N: NetworkT<B>> {
observer: Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>,
client: Arc<Client>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<SyncCryptoStorePtr>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
justification_sender: Option<GrandpaJustificationSender<B>>,
_phantom: PhantomData<BE>,
}
impl<B, BE, Client, Network> ObserverWork<B, BE, Client, Network>
where
B: BlockT,
BE: Backend<B> + 'static,
Client: crate::ClientForGrandpa<B, BE> + 'static,
Network: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
{
fn new(
client: Arc<Client>,
network: NetworkBridge<B, Network>,
persistent_data: PersistentData<B>,
keystore: Option<SyncCryptoStorePtr>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
justification_sender: Option<GrandpaJustificationSender<B>>,
) -> Self {
let mut work = ObserverWork {
observer: Box::pin(future::pending()) as Pin<Box<_>>,
client,
network,
persistent_data,
keystore: keystore.clone(),
voter_commands_rx,
justification_sender,
_phantom: PhantomData,
};
work.rebuild_observer();
work
}
fn rebuild_observer(&mut self) {
let set_id = self.persistent_data.authority_set.set_id();
let voters = Arc::new(self.persistent_data.authority_set.current_authorities());
let (global_in, _) = global_communication(
set_id,
&voters,
self.client.clone(),
&self.network,
self.keystore.as_ref(),
None,
);
let last_finalized_number = self.client.info().finalized_number;
let note_round = {
let network = self.network.clone();
let voters = voters.clone();
move |round| network.note_round(
crate::communication::Round(round),
crate::communication::SetId(set_id),
&*voters,
)
};
let observer = grandpa_observer(
&self.client,
&self.persistent_data.authority_set,
&voters,
&self.justification_sender,
last_finalized_number,
global_in,
note_round,
);
self.observer = Box::pin(observer);
}
fn handle_voter_command(
&mut self,
command: VoterCommand<B::Hash, NumberFor<B>>,
) -> Result<(), Error> {
self.persistent_data.set_state = match command {
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
let completed_rounds = self.persistent_data.set_state.read().completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
set_state
},
VoterCommand::ChangeAuthorities(new) => {
let set_state = VoterSetState::live(
new.set_id,
&*self.persistent_data.authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
set_state
},
}.into();
self.rebuild_observer();
Ok(())
}
}
impl<B, BE, C, N> Future for ObserverWork<B, BE, C, N>
where
B: BlockT,
BE: Backend<B> + Unpin + 'static,
C: crate::ClientForGrandpa<B, BE> + 'static,
N: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.observer), cx) {
Poll::Pending => {}
Poll::Ready(Ok(())) => {
return Poll::Ready(Ok(()))
}
Poll::Ready(Err(CommandOrError::Error(e))) => {
return Poll::Ready(Err(e))
}
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {}
Poll::Ready(None) => {
return Poll::Ready(Ok(()))
}
Poll::Ready(Some(command)) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
Future::poll(Pin::new(&mut self.network), cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use sp_utils::mpsc::tracing_unbounded;
use crate::{aux_schema, communication::tests::{Event, make_test_network}};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
use sc_network::PeerId;
use sp_blockchain::HeaderBackend as _;
use futures::executor;
#[test]
fn observer_work_polls_underlying_network_bridge() {
let (tester_fut, _network) = make_test_network();
let mut tester = executor::block_on(tester_fut);
let (client, backend) = {
let builder = TestClientBuilder::with_default_backend();
let backend = builder.backend();
let (client, _) = builder.build_with_longest_chain();
(Arc::new(client), backend)
};
let voters = vec![(sp_keyring::Ed25519Keyring::Alice.public().into(), 1)];
let persistent_data = aux_schema::load_persistent(
&*backend,
client.info().genesis_hash,
0,
|| Ok(voters),
).unwrap();
let (_tx, voter_command_rx) = tracing_unbounded("");
let observer = ObserverWork::new(
client,
tester.net_handle.clone(),
persistent_data,
None,
voter_command_rx,
None,
);
let peer_id = PeerId::random();
tester.trigger_gossip_validator_reputation_change(&peer_id);
executor::block_on(async move {
assert!(observer.now_or_never().is_none());
match tester.events.next().now_or_never() {
Some(Some(Event::EventStream(_))) => {},
_ => panic!("expected event stream request"),
};
assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
});
}
}