use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::iter;
use std::time;
use log::{debug, error, trace};
use lru::LruCache;
use libp2p::PeerId;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sc_network::ObservedRole;
use wasm_timer::Instant;
const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
mod rep {
use sc_network::ReputationChange as Rep;
pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip");
pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
}
struct PeerConsensus<H> {
known_messages: HashSet<H>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TopicNotification {
pub message: Vec<u8>,
pub sender: Option<PeerId>,
}
struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
topic: B::Hash,
message: Vec<u8>,
sender: Option<PeerId>,
}
struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip<B>,
network: &'p mut dyn Network<B>,
}
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
self.gossip.broadcast_topic(self.network, topic, force);
}
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
self.gossip.multicast(
self.network,
topic,
message,
force,
);
}
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.network.write_notification(who.clone(), self.gossip.protocol.clone(), message);
}
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
self.gossip.send_topic(self.network, who, topic, force);
}
}
fn propagate<'a, B: BlockT, I>(
network: &mut dyn Network<B>,
protocol: Cow<'static, str>,
messages: I,
intent: MessageIntent,
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validator: &Arc<dyn Validator<B>>,
)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
{
let mut message_allowed = validator.message_allowed();
for (id, ref mut peer) in peers.iter_mut() {
for (message_hash, topic, message) in messages.clone() {
let intent = match intent {
MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(&message_hash) {
continue;
} else {
MessageIntent::Broadcast
},
MessageIntent::PeriodicRebroadcast =>
if peer.known_messages.contains(&message_hash) {
MessageIntent::PeriodicRebroadcast
} else {
MessageIntent::Broadcast
},
other => other,
};
if !message_allowed(id, intent, &topic, &message) {
continue;
}
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
network.write_notification(id.clone(), protocol.clone(), message.clone());
}
}
}
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
protocol: Cow<'static, str>,
validator: Arc<dyn Validator<B>>,
next_broadcast: Instant,
metrics: Option<Metrics>,
}
impl<B: BlockT> ConsensusGossip<B> {
pub fn new(
validator: Arc<dyn Validator<B>>,
protocol: Cow<'static, str>,
metrics_registry: Option<&Registry>,
) -> Self {
let metrics = match metrics_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: "gossip", "Failed to register metrics: {:?}", e);
None
}
None => None,
};
ConsensusGossip {
peers: HashMap::new(),
messages: Default::default(),
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
protocol,
validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
metrics,
}
}
pub fn new_peer(&mut self, network: &mut dyn Network<B>, who: PeerId, role: ObservedRole) {
if role.is_light() {
return;
}
trace!(target:"gossip", "Registering {:?} {}", role, who);
self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(),
});
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, network };
validator.new_peer(&mut context, &who, role);
}
fn register_message_hashed(
&mut self,
message_hash: B::Hash,
topic: B::Hash,
message: Vec<u8>,
sender: Option<PeerId>,
) {
if self.known_messages.put(message_hash.clone(), ()).is_none() {
self.messages.push(MessageEntry {
message_hash,
topic,
message,
sender,
});
if let Some(ref metrics) = self.metrics {
metrics.registered_messages.inc();
}
}
}
pub fn register_message(
&mut self,
topic: B::Hash,
message: Vec<u8>,
) {
let message_hash = HashFor::<B>::hash(&message[..]);
self.register_message_hashed(message_hash, topic, message, None);
}
pub fn peer_disconnected(&mut self, network: &mut dyn Network<B>, who: PeerId) {
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, network };
validator.peer_disconnected(&mut context, &who);
self.peers.remove(&who);
}
pub fn tick(&mut self, network: &mut dyn Network<B>) {
self.collect_garbage();
if Instant::now() >= self.next_broadcast {
self.rebroadcast(network);
self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
}
}
fn rebroadcast(&mut self, network: &mut dyn Network<B>) {
let messages = self.messages.iter()
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
propagate(
network,
self.protocol.clone(),
messages,
MessageIntent::PeriodicRebroadcast,
&mut self.peers,
&self.validator
);
}
pub fn broadcast_topic(&mut self, network: &mut dyn Network<B>, topic: B::Hash, force: bool) {
let messages = self.messages.iter()
.filter_map(|entry|
if entry.topic == topic {
Some((&entry.message_hash, &entry.topic, &entry.message))
} else { None }
);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(network, self.protocol.clone(), messages, intent, &mut self.peers, &self.validator);
}
pub fn collect_garbage(&mut self) {
let known_messages = &mut self.known_messages;
let before = self.messages.len();
let mut message_expired = self.validator.message_expired();
self.messages
.retain(|entry| !message_expired(entry.topic, &entry.message));
let expired_messages = before - self.messages.len();
if let Some(ref metrics) = self.metrics {
metrics.expired_messages.inc_by(expired_messages as u64)
}
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
expired_messages,
self.messages.len(),
known_messages.len(),
);
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.contains(h));
}
}
pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
self.messages.iter().filter(move |e| e.topic == topic).map(|entry| TopicNotification {
message: entry.message.clone(),
sender: entry.sender.clone(),
})
}
pub fn on_incoming(
&mut self,
network: &mut dyn Network<B>,
who: PeerId,
messages: Vec<Vec<u8>>,
) -> Vec<(B::Hash, TopicNotification)> {
let mut to_forward = vec![];
if !messages.is_empty() {
trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who);
}
for message in messages {
let message_hash = HashFor::<B>::hash(&message[..]);
if self.known_messages.contains(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
network.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
continue;
}
let validation = {
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, network };
validator.validate(&mut context, &who, &message)
};
let (topic, keep) = match validation {
ValidationResult::ProcessAndKeep(topic) => (topic, true),
ValidationResult::ProcessAndDiscard(topic) => (topic, false),
ValidationResult::Discard => {
trace!(target:"gossip", "Discard message from peer {}", who);
continue;
},
};
let peer = match self.peers.get_mut(&who) {
Some(peer) => peer,
None => {
error!(target:"gossip", "Got message from unregistered peer {}", who);
continue;
}
};
network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
peer.known_messages.insert(message_hash);
to_forward.push((topic, TopicNotification {
message: message.clone(),
sender: Some(who.clone())
}));
if keep {
self.register_message_hashed(
message_hash,
topic,
message,
Some(who.clone()),
);
}
}
to_forward
}
pub fn send_topic(
&mut self,
network: &mut dyn Network<B>,
who: &PeerId,
topic: B::Hash,
force: bool
) {
let mut message_allowed = self.validator.message_allowed();
if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic) {
let intent = if force {
MessageIntent::ForcedBroadcast
} else {
MessageIntent::Broadcast
};
if !force && peer.known_messages.contains(&entry.message_hash) {
continue;
}
if !message_allowed(who, intent, &entry.topic, &entry.message) {
continue;
}
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
network.write_notification(who.clone(), self.protocol.clone(), entry.message.clone());
}
}
}
pub fn multicast(
&mut self,
network: &mut dyn Network<B>,
topic: B::Hash,
message: Vec<u8>,
force: bool,
) {
let message_hash = HashFor::<B>::hash(&message);
self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(
network,
self.protocol.clone(),
iter::once((&message_hash, &topic, &message)),
intent,
&mut self.peers,
&self.validator
);
}
pub fn send_message(
&mut self,
network: &mut dyn Network<B>,
who: &PeerId,
message: Vec<u8>,
) {
let peer = match self.peers.get_mut(who) {
None => return,
Some(peer) => peer,
};
let message_hash = HashFor::<B>::hash(&message);
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.known_messages.insert(message_hash);
network.write_notification(who.clone(), self.protocol.clone(), message);
}
}
struct Metrics {
registered_messages: Counter<U64>,
expired_messages: Counter<U64>,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
registered_messages: register(
Counter::new(
"network_gossip_registered_messages_total",
"Number of registered messages by the gossip service.",
)?,
registry,
)?,
expired_messages: register(
Counter::new(
"network_gossip_expired_messages_total",
"Number of expired messages by the gossip service.",
)?,
registry,
)?,
})
}
}
#[cfg(test)]
mod tests {
use futures::prelude::*;
use sc_network::{Event, ReputationChange};
use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use std::{borrow::Cow, pin::Pin, sync::{Arc, Mutex}};
use super::*;
type Block = RawBlock<ExtrinsicWrapper<u64>>;
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.put($hash, ()).is_none() {
$consensus.messages.push(MessageEntry {
message_hash: $hash,
topic: $topic,
message: $m,
sender: None,
});
}
}
}
struct AllowAll;
impl Validator<Block> for AllowAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
}
struct DiscardAll;
impl Validator<Block> for DiscardAll{
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::Discard
}
}
#[derive(Clone, Default)]
struct NoOpNetwork {
inner: Arc<Mutex<NoOpNetworkInner>>,
}
#[derive(Clone, Default)]
struct NoOpNetworkInner {
peer_reports: Vec<(PeerId, ReputationChange)>,
}
impl<B: BlockT> Network<B> for NoOpNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
unimplemented!();
}
fn report_peer(&self, peer_id: PeerId, reputation_change: ReputationChange) {
self.inner.lock().unwrap().peer_reports.push((peer_id, reputation_change));
}
fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) {
unimplemented!();
}
fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {
}
fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {
}
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
unimplemented!();
}
fn announce(&self, _: B::Hash, _: Option<Vec<u8>>) {
unimplemented!();
}
}
#[test]
fn collects_garbage() {
struct AllowOne;
impl Validator<Block> for AllowOne {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
data: &[u8],
) -> ValidationResult<H256> {
if data[0] == 1 {
ValidationResult::ProcessAndKeep(H256::default())
} else {
ValidationResult::Discard
}
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| data[0] != 1)
}
}
let prev_hash = H256::random();
let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let m1_hash = H256::random();
let m2_hash = H256::random();
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2);
consensus.known_messages.put(m1_hash, ());
consensus.known_messages.put(m2_hash, ());
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.known_messages.len(), 2);
consensus.validator = Arc::new(AllowOne);
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains(&m2_hash));
}
#[test]
fn message_stream_include_those_sent_before_asking() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let message = vec![4, 5, 6];
let topic = HashFor::<Block>::hash(&[1, 2, 3]);
consensus.register_message(topic, message.clone());
assert_eq!(
consensus.messages_for(topic).next(),
Some(TopicNotification { message: message, sender: None }),
);
}
#[test]
fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let topic = [1; 32].into();
let msg_a = vec![1, 2, 3];
let msg_b = vec![4, 5, 6];
consensus.register_message(topic, msg_a);
consensus.register_message(topic, msg_b);
assert_eq!(consensus.messages.len(), 2);
}
#[test]
fn peer_is_removed_on_disconnect() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let mut network = NoOpNetwork::default();
let peer_id = PeerId::random();
consensus.new_peer(&mut network, peer_id.clone(), ObservedRole::Full);
assert!(consensus.peers.contains_key(&peer_id));
consensus.peer_disconnected(&mut network, peer_id.clone());
assert!(!consensus.peers.contains_key(&peer_id));
}
#[test]
fn on_incoming_ignores_discarded_messages() {
let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
.on_incoming(
&mut NoOpNetwork::default(),
PeerId::random(),
vec![vec![1, 2, 3]],
);
assert!(
to_forward.is_empty(),
"Expected `on_incoming` to ignore discarded message but got {:?}", to_forward,
);
}
#[test]
fn on_incoming_ignores_unregistered_peer() {
let mut network = NoOpNetwork::default();
let remote = PeerId::random();
let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
.on_incoming(
&mut network,
remote.clone(),
vec![vec![1, 2, 3]],
);
assert!(
to_forward.is_empty(),
"Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
to_forward,
);
}
}