use crate::{Network, Validator};
use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL};
use sc_network::{Event, ReputationChange};
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender, Receiver};
use libp2p::PeerId;
use log::trace;
use prometheus_endpoint::Registry;
use sp_runtime::traits::Block as BlockT;
use std::{
borrow::Cow,
collections::{HashMap, VecDeque},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct GossipEngine<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
protocol: Cow<'static, str>,
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
forwarding_state: ForwardingState<B>,
}
enum ForwardingState<B: BlockT> {
Idle,
Busy(VecDeque<(B::Hash, TopicNotification)>),
}
impl<B: BlockT> Unpin for GossipEngine<B> {}
impl<B: BlockT> GossipEngine<B> {
pub fn new<N: Network<B> + Send + Clone + 'static>(
network: N,
protocol: impl Into<Cow<'static, str>>,
validator: Arc<dyn Validator<B>>,
metrics_registry: Option<&Registry>,
) -> Self where B: 'static {
let protocol = protocol.into();
let network_event_stream = network.event_stream();
GossipEngine {
state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
protocol,
network_event_stream,
message_sinks: HashMap::new(),
forwarding_state: ForwardingState::Idle,
}
}
pub fn report(&self, who: PeerId, reputation: ReputationChange) {
self.network.report_peer(who, reputation);
}
pub fn register_gossip_message(
&mut self,
topic: B::Hash,
message: Vec<u8>,
) {
self.state_machine.register_message(topic, message);
}
pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
self.state_machine.broadcast_topic(&mut *self.network, topic, force);
}
pub fn messages_for(&mut self, topic: B::Hash)
-> Receiver<TopicNotification>
{
let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
for notification in past_messages{
tx.try_send(notification)
.expect("receiver known to be live, and buffer size known to suffice; qed");
}
self.message_sinks.entry(topic).or_default().push(tx);
rx
}
pub fn send_topic(
&mut self,
who: &PeerId,
topic: B::Hash,
force: bool
) {
self.state_machine.send_topic(&mut *self.network, who, topic, force)
}
pub fn gossip_message(
&mut self,
topic: B::Hash,
message: Vec<u8>,
force: bool,
) {
self.state_machine.multicast(&mut *self.network, topic, message, force)
}
pub fn send_message(&mut self, who: Vec<sc_network::PeerId>, data: Vec<u8>) {
for who in &who {
self.state_machine.send_message(&mut *self.network, who, data.clone());
}
}
pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
self.network.announce(block, associated_data);
}
}
impl<B: BlockT> Future for GossipEngine<B> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
'outer: loop {
match &mut this.forwarding_state {
ForwardingState::Idle => {
match this.network_event_stream.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
Event::SyncConnected { remote } => {
this.network.add_set_reserved(remote, this.protocol.clone());
}
Event::SyncDisconnected { remote } => {
this.network.remove_set_reserved(remote, this.protocol.clone());
}
Event::NotificationStreamOpened { remote, protocol, role } => {
if protocol != this.protocol {
continue;
}
this.state_machine.new_peer(&mut *this.network, remote, role);
}
Event::NotificationStreamClosed { remote, protocol } => {
if protocol != this.protocol {
continue;
}
this.state_machine.peer_disconnected(&mut *this.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let messages = messages.into_iter().filter_map(|(engine, data)| {
if engine == this.protocol {
Some(data.to_vec())
} else {
None
}
}).collect();
let to_forward = this.state_machine.on_incoming(
&mut *this.network,
remote,
messages,
);
this.forwarding_state = ForwardingState::Busy(to_forward.into());
},
Event::Dht(_) => {}
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
}
}
ForwardingState::Busy(to_forward) => {
let (topic, notification) = match to_forward.pop_front() {
Some(n) => n,
None => {
this.forwarding_state = ForwardingState::Idle;
continue;
}
};
let sinks = match this.message_sinks.get_mut(&topic) {
Some(sinks) => sinks,
None => {
continue;
},
};
for sink in sinks.iter_mut() {
match sink.poll_ready(cx) {
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {},
Poll::Pending => {
to_forward.push_front((topic, notification));
break 'outer;
}
}
}
sinks.retain(|sink| !sink.is_closed());
if sinks.is_empty() {
this.message_sinks.remove(&topic);
continue;
}
trace!(
target: "gossip",
"Pushing consensus message to sinks for {}.", topic,
);
for sink in sinks {
match sink.start_send(notification.clone()) {
Ok(()) => {},
Err(e) if e.is_full() => unreachable!(
"Previously ensured that all sinks are ready; qed.",
),
Err(_) => {},
}
}
}
}
}
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
this.state_machine.tick(&mut *this.network);
this.message_sinks.retain(|_, sinks| {
sinks.retain(|sink| !sink.is_closed());
!sinks.is_empty()
});
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use async_std::task::spawn;
use crate::{ValidationResult, ValidatorContext};
use futures::{channel::mpsc::{unbounded, UnboundedSender}, executor::{block_on, block_on_stream}, future::poll_fn};
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sc_network::ObservedRole;
use sp_runtime::{testing::H256, traits::{Block as BlockT}};
use std::borrow::Cow;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use substrate_test_runtime_client::runtime::Block;
use super::*;
#[derive(Clone, Default)]
struct TestNetwork {
inner: Arc<Mutex<TestNetworkInner>>,
}
#[derive(Clone, Default)]
struct TestNetworkInner {
event_senders: Vec<UnboundedSender<Event>>,
}
impl<B: BlockT> Network<B> for TestNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
let (tx, rx) = unbounded();
self.inner.lock().unwrap().event_senders.push(tx);
Box::pin(rx)
}
fn report_peer(&self, _: PeerId, _: ReputationChange) {
}
fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) {
unimplemented!();
}
fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {
}
fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {
}
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
unimplemented!();
}
fn announce(&self, _: B::Hash, _: Option<Vec<u8>>) {
unimplemented!();
}
}
struct AllowAll;
impl Validator<Block> for AllowAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
}
#[test]
fn returns_when_network_event_stream_closes() {
let network = TestNetwork::default();
let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
"/my_protocol",
Arc::new(AllowAll {}),
None,
);
drop(network.inner.lock().unwrap().event_senders.pop());
block_on(poll_fn(move |ctx| {
if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
panic!(
"Expected gossip engine to finish on first poll, given that \
`GossipEngine.network_event_stream` closes right away."
)
}
Poll::Ready(())
}))
}
#[test]
fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
let topic = H256::default();
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();
let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
protocol.clone(),
Arc::new(AllowAll {}),
None,
);
let mut event_sender = network.inner.lock()
.unwrap()
.event_senders
.pop()
.unwrap();
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
let messages = vec![vec![1], vec![2]];
let events = messages.iter().cloned().map(|m| {
Event::NotificationsReceived {
remote: remote_peer.clone(),
messages: vec![(protocol.clone(), m.into())]
}
}).collect::<Vec<_>>();
event_sender.start_send(events[0].clone()).expect("Event stream is unbounded; qed.");
let mut subscribers = vec![];
for _ in 0..2 {
subscribers.push(gossip_engine.messages_for(topic));
}
event_sender.start_send(events[1].clone()).expect("Event stream is unbounded; qed.");
spawn(gossip_engine);
let mut subscribers = subscribers.into_iter()
.map(|s| block_on_stream(s))
.collect::<Vec<_>>();
for message in messages {
for subscriber in subscribers.iter_mut() {
assert_eq!(
subscriber.next(),
Some(TopicNotification {
message: message.clone(),
sender: Some(remote_peer.clone()),
}),
);
}
}
}
#[test]
fn forwarding_to_different_size_and_topic_channels() {
#[derive(Clone, Debug)]
struct ChannelLengthAndTopic{
length: usize,
topic: H256,
}
impl Arbitrary for ChannelLengthAndTopic {
fn arbitrary(g: &mut Gen) -> Self {
let possible_length = (0..100).collect::<Vec<usize>>();
let possible_topics = (0..10).collect::<Vec<u64>>();
Self {
length: *g.choose(&possible_length).unwrap(),
topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
}
}
}
#[derive(Clone, Debug)]
struct Message {
topic: H256,
}
impl Arbitrary for Message{
fn arbitrary(g: &mut Gen) -> Self {
let possible_topics = (0..10).collect::<Vec<u64>>();
Self {
topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
}
}
}
struct TestValidator;
impl Validator<Block> for TestValidator {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
}
}
fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();
let num_channels_per_topic = channels.iter()
.fold(HashMap::new(), |mut acc, ChannelLengthAndTopic { topic, .. }| {
acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
acc
});
let expected_msgs_per_topic_all_chan = notifications.iter()
.fold(HashMap::new(), |mut acc, messages| {
for message in messages {
acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
}
acc
})
.into_iter()
.map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
.collect::<HashMap<H256, _>>();
let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
protocol.clone(),
Arc::new(TestValidator {}),
None,
);
let (txs, mut rxs) = channels.iter()
.map(|ChannelLengthAndTopic { length, topic }| {
(topic.clone(), channel(*length))
})
.fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
acc.0.push((topic, tx)); acc.1.push((topic, rx));
acc
});
for (topic, tx) in txs {
match gossip_engine.message_sinks.get_mut(&topic) {
Some(entry) => entry.push(tx),
None => {
gossip_engine.message_sinks.insert(topic, vec![tx]);
}
}
}
let mut event_sender = network.inner.lock()
.unwrap()
.event_senders
.pop()
.unwrap();
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
for (i_notification, messages) in notifications.iter().enumerate() {
let messages = messages.into_iter().enumerate()
.map(|(i_message, Message { topic })| {
let mut message = topic.as_bytes().to_vec();
message.push(i_notification.try_into().unwrap());
message.push(i_message.try_into().unwrap());
(protocol.clone(), message.into())
}).collect();
event_sender.start_send(Event::NotificationsReceived {
remote: remote_peer.clone(),
messages,
}).expect("Event stream is unbounded; qed.");
}
let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
block_on(poll_fn(|cx| {
loop {
if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
unreachable!(
"Event stream sender side is not dropped, thus gossip engine does not \
terminate",
);
}
let mut progress = false;
for (topic, rx) in rxs.iter_mut() {
match rx.poll_next_unpin(cx) {
Poll::Ready(Some(_)) => {
progress = true;
received_msgs_per_topic_all_chan.entry(*topic)
.and_modify(|e| *e += 1)
.or_insert(1);
},
Poll::Ready(None) => unreachable!(
"Sender side of channel is never dropped",
),
Poll::Pending => {},
}
}
if !progress {
break;
}
}
Poll::Ready(())
}));
for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
assert_eq!(
received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
expected_num,
);
}
for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
assert_eq!(
expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
received_num,
);
}
}
prop(vec![], vec![vec![Message{ topic: H256::default()}]]);
prop(
vec![ChannelLengthAndTopic {length: 71, topic: H256::default()}],
vec![vec![Message{ topic: H256::default()}]],
);
QuickCheck::new().quickcheck(prop as fn(_, _))
}
}