#[cfg(feature = "std")]
use futures::ready;
use futures::prelude::*;
use futures::channel::mpsc::UnboundedSender;
#[cfg(feature = "std")]
use log::{trace, warn, debug};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::round::{Round, State as RoundState};
use crate::{
Commit, Message, Prevote, Precommit, PrimaryPropose, SignedMessage,
SignedPrecommit, BlockNumberOps, validate_commit, ImportResult,
HistoricalVotes, weights::VoteWeight,
};
use crate::voter_set::VoterSet;
use super::{Environment, Buffered, FinalizedNotification};
pub(super) enum State<T> {
Start(T, T),
Proposed(T, T),
Prevoted(T),
Precommitted,
}
impl<T> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
State::Start(..) => write!(f, "Start"),
State::Proposed(..) => write!(f, "Proposed"),
State::Prevoted(_) => write!(f, "Prevoted"),
State::Precommitted => write!(f, "Precommitted"),
}
}
}
pub(super) struct VotingRound<H, N, E: Environment<H, N>> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
{
env: Arc<E>,
voting: Voting,
votes: Round<E::Id, H, N, E::Signature>,
incoming: E::In,
outgoing: Buffered<E::Out, Message<H, N>>,
state: Option<State<E::Timer>>,
bridged_round_state: Option<crate::bridge_state::PriorView<H, N>>,
last_round_state: Option<crate::bridge_state::LatterView<H, N>>,
primary_block: Option<(H, N)>,
finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
best_finalized: Option<Commit<H, N, E::Signature, E::Id>>,
}
enum Voting {
No,
Yes,
Primary,
}
impl Voting {
fn is_active(&self) -> bool {
match self {
Voting::Yes | Voting::Primary => true,
_ => false,
}
}
fn is_primary(&self) -> bool {
match self {
Voting::Primary => true,
_ => false,
}
}
}
impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
{
pub (super) fn new(
round_number: u64,
voters: VoterSet<E::Id>,
base: (H, N),
last_round_state: Option<crate::bridge_state::LatterView<H, N>>,
finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
env: Arc<E>,
) -> VotingRound<H, N, E> {
let round_data = env.round_data(round_number);
let round_params = crate::round::RoundParams {
voters,
base,
round_number,
};
let votes = Round::new(round_params);
let voting = if round_data.voter_id.as_ref() == Some(&votes.primary_voter().0) {
Voting::Primary
} else if round_data.voter_id
.as_ref()
.map_or(false, |id| votes.voters().contains(id))
{
Voting::Yes
} else {
Voting::No
};
VotingRound {
votes,
voting,
incoming: round_data.incoming,
outgoing: Buffered::new(round_data.outgoing),
state: Some(
State::Start(round_data.prevote_timer, round_data.precommit_timer)
),
bridged_round_state: None,
primary_block: None,
best_finalized: None,
env,
last_round_state,
finalized_sender,
}
}
pub (super) fn completed(
votes: Round<E::Id, H, N, E::Signature>,
finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
last_round_state: Option<crate::bridge_state::LatterView<H, N>>,
env: Arc<E>,
) -> VotingRound<H, N, E> {
let round_data = env.round_data(votes.number());
VotingRound {
votes,
voting: Voting::No,
incoming: round_data.incoming,
outgoing: Buffered::new(round_data.outgoing),
state: None,
bridged_round_state: None,
primary_block: None,
env,
last_round_state,
finalized_sender,
best_finalized: None,
}
}
pub(super) fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), E::Error>> {
trace!(target: "afg", "Polling round {}, state = {:?}, step = {:?}", self.votes.number(), self.votes.state(), self.state);
let pre_state = self.votes.state();
self.process_incoming(cx)?;
let last_round_state = self.last_round_state.as_ref().map(|s| s.get(cx).clone());
if let Some(ref last_round_state) = last_round_state {
self.primary_propose(last_round_state)?;
self.prevote(cx, last_round_state)?;
self.precommit(cx, last_round_state)?;
}
ready!(self.outgoing.poll(cx))?;
self.process_incoming(cx)?;
let post_state = self.votes.state();
self.notify(pre_state, post_state);
if !self.votes.completable() {
return Poll::Pending;
}
let last_round_estimate_finalized = match last_round_state {
Some(RoundState {
estimate: Some((_, last_round_estimate)),
finalized: Some((_, last_round_finalized)),
..
}) => {
let finalized_in_last_round = last_round_estimate <= last_round_finalized;
let finalized_in_current_round = self.finalized().map_or(
false,
|(_, current_round_finalized)| last_round_estimate <= *current_round_finalized,
);
finalized_in_last_round || finalized_in_current_round
},
None => {
true
},
_ => false,
};
if !last_round_estimate_finalized {
trace!(target: "afg", "Round {} completable but estimate not finalized.", self.round_number());
self.log_participation(log::Level::Trace);
return Poll::Pending;
}
debug!(target: "afg", "Completed round {}, state = {:?}, step = {:?}",
self.votes.number(), self.votes.state(), self.state);
self.log_participation(log::Level::Debug);
Poll::Ready(Ok(()))
}
pub(super) fn state(&self) -> Option<&State<E::Timer>> {
self.state.as_ref()
}
pub(super) fn env(&self) -> &E {
&*self.env
}
pub(super) fn round_number(&self) -> u64 {
self.votes.number()
}
pub(super) fn round_state(&self) -> RoundState<H, N> {
self.votes.state()
}
pub(super) fn dag_base(&self) -> (H, N) {
self.votes.base()
}
pub(super) fn voters(&self) -> &VoterSet<E::Id> {
self.votes.voters()
}
pub(super) fn finalized(&self) -> Option<&(H, N)> {
self.votes.finalized()
}
pub(super) fn prevote_weight(&self) -> VoteWeight {
self.votes.prevote_participation().0
}
pub(super) fn precommit_weight(&self) -> VoteWeight {
self.votes.precommit_participation().0
}
pub(super) fn prevote_ids(&self) -> impl Iterator<Item = E::Id> {
self.votes.prevotes().into_iter().map(|pv| pv.0)
}
pub(super) fn precommit_ids(&self) -> impl Iterator<Item = E::Id> {
self.votes.precommits().into_iter().map(|pv| pv.0)
}
pub(super) fn check_and_import_from_commit(
&mut self,
commit: &Commit<H, N, E::Signature, E::Id>
) -> Result<Option<(H, N)>, E::Error> {
let base = validate_commit(commit, self.voters(), &*self.env)?.ghost;
if base.is_none() { return Ok(None) }
for SignedPrecommit { precommit, signature, id } in commit.precommits.iter().cloned() {
let import_result = self.votes.import_precommit(&*self.env, precommit, id, signature)?;
if let ImportResult { equivocation: Some(e), .. } = import_result {
self.env.precommit_equivocation(self.round_number(), e);
}
}
Ok(base)
}
pub(super) fn finalized_sender(&self) -> UnboundedSender<FinalizedNotification<H, N, E>> {
self.finalized_sender.clone()
}
pub(super) fn bridge_state(&mut self) -> crate::bridge_state::LatterView<H, N> {
let (prior_view, latter_view) = crate::bridge_state::bridge_state(self.votes.state());
if self.bridged_round_state.is_some() {
warn!(target: "afg", "Bridged state from round {} more than once.",
self.votes.number());
}
self.bridged_round_state = Some(prior_view);
latter_view
}
pub(super) fn finalizing_commit(&self) -> Option<&Commit<H, N, E::Signature, E::Id>> {
self.best_finalized.as_ref()
}
pub(super) fn historical_votes(&self) -> &HistoricalVotes<H, N, E::Signature, E::Id> {
self.votes.historical_votes()
}
pub(super) fn handle_vote(&mut self, vote: SignedMessage<H, N, E::Signature, E::Id>) -> Result<(), E::Error> {
let SignedMessage { message, signature, id } = vote;
if !self.env.is_equal_or_descendent_of(self.votes.base().0, message.target().0.clone()) {
trace!(target: "afg", "Ignoring message targeting {:?} lower than round base {:?}",
message.target(),
self.votes.base(),
);
return Ok(());
}
match message {
Message::Prevote(prevote) => {
let import_result = self.votes.import_prevote(&*self.env, prevote, id, signature)?;
if let ImportResult { equivocation: Some(e), .. } = import_result {
self.env.prevote_equivocation(self.votes.number(), e);
}
}
Message::Precommit(precommit) => {
let import_result = self.votes.import_precommit(&*self.env, precommit, id, signature)?;
if let ImportResult { equivocation: Some(e), .. } = import_result {
self.env.precommit_equivocation(self.votes.number(), e);
}
}
Message::PrimaryPropose(primary) => {
let primary_id = self.votes.primary_voter().0.clone();
if id == primary_id {
self.primary_block = Some((primary.target_hash, primary.target_number));
}
}
}
Ok(())
}
fn log_participation(&self, log_level: log::Level) {
let total_weight = self.voters().total_weight();
let threshold = self.voters().threshold();
let n_voters = self.voters().len();
let number = self.round_number();
let (prevote_weight, n_prevotes) = self.votes.prevote_participation();
let (precommit_weight, n_precommits) = self.votes.precommit_participation();
log::log!(target: "afg", log_level, "Round {}: prevotes: {}/{}/{} weight, {}/{} actual",
number, prevote_weight, threshold, total_weight, n_prevotes, n_voters);
log::log!(target: "afg", log_level, "Round {}: precommits: {}/{}/{} weight, {}/{} actual",
number, precommit_weight, threshold, total_weight, n_precommits, n_voters);
}
fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> {
while let Poll::Ready(Some(incoming)) = Stream::poll_next(Pin::new(&mut self.incoming), cx) {
trace!(target: "afg", "Round {}: Got incoming message", self.round_number());
self.handle_vote(incoming?)?;
}
Ok(())
}
fn primary_propose(&mut self, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
match self.state.take() {
Some(State::Start(prevote_timer, precommit_timer)) => {
let maybe_estimate = last_round_state.estimate.clone();
match (maybe_estimate, self.voting.is_primary()) {
(Some(last_round_estimate), true) => {
let maybe_finalized = last_round_state.finalized.clone();
let should_send_primary = maybe_finalized.map_or(true, |f| last_round_estimate.1 > f.1);
if should_send_primary {
debug!(target: "afg", "Sending primary block hint for round {}", self.votes.number());
let primary = PrimaryPropose {
target_hash: last_round_estimate.0,
target_number: last_round_estimate.1,
};
self.env.proposed(self.round_number(), primary.clone())?;
self.outgoing.push(Message::PrimaryPropose(primary));
self.state = Some(State::Proposed(prevote_timer, precommit_timer));
return Ok(());
} else {
debug!(target: "afg", "Last round estimate has been finalized, \
not sending primary block hint for round {}", self.votes.number());
}
},
(None, true) => {
debug!(target: "afg", "Last round estimate does not exist, \
not sending primary block hint for round {}", self.votes.number());
},
_ => {},
}
self.state = Some(State::Start(prevote_timer, precommit_timer));
},
x => { self.state = x; }
}
Ok(())
}
fn prevote(&mut self, cx: &mut Context, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
let state = self.state.take();
let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| {
let should_prevote = match prevote_timer.poll_unpin(cx) {
Poll::Ready(Err(e)) => return Err(e),
Poll::Ready(Ok(())) => true,
Poll::Pending => self.votes.completable(),
};
if should_prevote {
if self.voting.is_active() {
if let Some(prevote) = self.construct_prevote(last_round_state)? {
debug!(target: "afg", "Casting prevote for round {}", self.votes.number());
self.env.prevoted(self.round_number(), prevote.clone())?;
self.votes.set_prevoted_index();
self.outgoing.push(Message::Prevote(prevote));
self.state = Some(State::Prevoted(precommit_timer));
} else {
self.state = None;
self.voting = Voting::No;
}
} else {
self.state = Some(State::Prevoted(precommit_timer));
}
} else if proposed {
self.state = Some(State::Proposed(prevote_timer, precommit_timer));
} else {
self.state = Some(State::Start(prevote_timer, precommit_timer));
}
Ok(())
};
match state {
Some(State::Start(prevote_timer, precommit_timer)) => {
handle_prevote(prevote_timer, precommit_timer, false)?;
},
Some(State::Proposed(prevote_timer, precommit_timer)) => {
handle_prevote(prevote_timer, precommit_timer, true)?;
},
x => { self.state = x; }
}
Ok(())
}
fn precommit(&mut self, cx: &mut Context, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
match self.state.take() {
Some(State::Prevoted(mut precommit_timer)) => {
let last_round_estimate = last_round_state.estimate.clone()
.expect("Rounds only started when prior round completable; qed");
let should_precommit = {
self.votes.state().prevote_ghost.as_ref().map_or(false, |p_g| {
p_g == &last_round_estimate ||
self.env.is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone())
})
} && match precommit_timer.poll_unpin(cx) {
Poll::Ready(Err(e)) => return Err(e),
Poll::Ready(Ok(())) => true,
Poll::Pending => self.votes.completable(),
};
if should_precommit {
if self.voting.is_active() {
debug!(target: "afg", "Casting precommit for round {}", self.votes.number());
let precommit = self.construct_precommit();
self.env.precommitted(self.round_number(), precommit.clone())?;
self.votes.set_precommitted_index();
self.outgoing.push(Message::Precommit(precommit));
}
self.state = Some(State::Precommitted);
} else {
self.state = Some(State::Prevoted(precommit_timer));
}
}
x => { self.state = x; }
}
Ok(())
}
fn construct_prevote(&self, last_round_state: &RoundState<H, N>) -> Result<Option<Prevote<H, N>>, E::Error> {
let last_round_estimate = last_round_state.estimate.clone()
.expect("Rounds only started when prior round completable; qed");
let find_descendent_of = match self.primary_block {
None => {
last_round_estimate.0
}
Some(ref primary_block) => {
let last_prevote_g = last_round_state.prevote_ghost.clone()
.expect("Rounds only started when prior round completable; qed");
if primary_block == &last_prevote_g {
primary_block.0.clone()
} else if primary_block.1 >= last_prevote_g.1 {
last_round_estimate.0
} else {
let &(ref p_hash, p_num) = primary_block;
match self.env.ancestry(last_round_estimate.0.clone(), last_prevote_g.0.clone()) {
Ok(ancestry) => {
let to_sub = p_num + N::one();
let offset: usize = if last_prevote_g.1 < to_sub {
0
} else {
(last_prevote_g.1 - to_sub).as_()
};
if ancestry.get(offset).map_or(false, |b| b == p_hash) {
p_hash.clone()
} else {
last_round_estimate.0
}
}
Err(crate::Error::NotDescendent) => {
warn!(target: "afg",
"Possible case of massive equivocation: \
last round prevote GHOST: {:?} is not a descendant of last round estimate: {:?}",
last_prevote_g,
last_round_estimate,
);
last_round_estimate.0
}
}
}
}
};
let best_chain = self.env.best_chain_containing(find_descendent_of.clone());
debug_assert!(best_chain.is_some(), "Previously known block {:?} has disappeared from chain", find_descendent_of);
let t = if let Some(target) = best_chain {
target
} else {
warn!(target: "afg", "Could not cast prevote: previously known block {:?} has disappeared", find_descendent_of);
return Ok(None);
};
Ok(Some(Prevote {
target_hash: t.0,
target_number: t.1,
}))
}
fn construct_precommit(&self) -> Precommit<H, N> {
let t = match self.votes.state().prevote_ghost {
Some(target) => target,
None => self.votes.base(),
};
Precommit {
target_hash: t.0,
target_number: t.1,
}
}
fn notify(&mut self, last_state: RoundState<H, N>, new_state: RoundState<H, N>) {
if last_state != new_state {
if let Some(ref b) = self.bridged_round_state {
b.update(new_state.clone());
}
}
let state_changed = last_state.finalized != new_state.finalized;
let sent_finality_notifications = self.best_finalized.is_some();
if new_state.completable && (state_changed || !sent_finality_notifications) {
let precommitted = matches!(self.state, Some(State::Precommitted));
let cant_vote = self.last_round_state.is_none();
if precommitted || cant_vote {
if let Some((f_hash, f_number)) = new_state.finalized {
let commit = Commit {
target_hash: f_hash.clone(),
target_number: f_number,
precommits: self.votes.finalizing_precommits(&*self.env)
.expect("always returns none if something was finalized; this is checked above; qed")
.collect(),
};
let finalized = (
f_hash.clone(),
f_number,
self.votes.number(),
commit.clone(),
);
let _ = self.finalized_sender.unbounded_send(finalized);
self.best_finalized = Some(commit);
}
}
}
}
}