use crate::time_cache::TimeCache;
use crate::{MessageId, TopicHash};
use libp2p_core::PeerId;
use log::{debug, trace, warn};
use std::collections::{hash_map, HashMap, HashSet};
use std::net::IpAddr;
use std::time::{Duration, Instant};
mod params;
use crate::error::ValidationError;
pub use params::{
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
TopicScoreParams,
};
#[cfg(test)]
mod tests;
const TIME_CACHE_DURATION: u64 = 120;
pub(crate) struct PeerScore {
params: PeerScoreParams,
peer_stats: HashMap<PeerId, PeerStats>,
peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
deliveries: TimeCache<MessageId, DeliveryRecord>,
message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
}
struct PeerStats {
status: ConnectionStatus,
topics: HashMap<TopicHash, TopicStats>,
known_ips: HashSet<IpAddr>,
behaviour_penalty: f64,
application_score: f64,
}
enum ConnectionStatus {
Connected,
Disconnected {
expire: Instant,
},
}
impl Default for PeerStats {
fn default() -> Self {
PeerStats {
status: ConnectionStatus::Connected,
topics: HashMap::new(),
known_ips: HashSet::new(),
behaviour_penalty: 0f64,
application_score: 0f64,
}
}
}
impl PeerStats {
pub fn stats_or_default_mut(
&mut self,
topic_hash: TopicHash,
params: &PeerScoreParams,
) -> Option<&mut TopicStats> {
if params.topics.get(&topic_hash).is_some() {
Some(self.topics.entry(topic_hash).or_default())
} else {
self.topics.get_mut(&topic_hash)
}
}
}
struct TopicStats {
mesh_status: MeshStatus,
first_message_deliveries: f64,
mesh_message_deliveries_active: bool,
mesh_message_deliveries: f64,
mesh_failure_penalty: f64,
invalid_message_deliveries: f64,
}
impl TopicStats {
pub fn in_mesh(&self) -> bool {
matches!(self.mesh_status, MeshStatus::Active { .. })
}
}
enum MeshStatus {
Active {
graft_time: Instant,
mesh_time: Duration,
},
InActive,
}
impl MeshStatus {
pub fn new_active() -> Self {
MeshStatus::Active {
graft_time: Instant::now(),
mesh_time: Duration::from_secs(0),
}
}
}
impl Default for TopicStats {
fn default() -> Self {
TopicStats {
mesh_status: MeshStatus::InActive,
first_message_deliveries: Default::default(),
mesh_message_deliveries_active: Default::default(),
mesh_message_deliveries: Default::default(),
mesh_failure_penalty: Default::default(),
invalid_message_deliveries: Default::default(),
}
}
}
#[derive(PartialEq, Debug)]
struct DeliveryRecord {
status: DeliveryStatus,
first_seen: Instant,
peers: HashSet<PeerId>,
}
#[derive(PartialEq, Debug)]
enum DeliveryStatus {
Unknown,
Valid(Instant),
Invalid,
Ignored,
}
impl Default for DeliveryRecord {
fn default() -> Self {
DeliveryRecord {
status: DeliveryStatus::Unknown,
first_seen: Instant::now(),
peers: HashSet::new(),
}
}
}
impl PeerScore {
#[allow(dead_code)]
pub fn new(params: PeerScoreParams) -> Self {
Self::new_with_message_delivery_time_callback(params, None)
}
pub fn new_with_message_delivery_time_callback(
params: PeerScoreParams,
callback: Option<fn(&PeerId, &TopicHash, f64)>,
) -> Self {
PeerScore {
params,
peer_stats: HashMap::new(),
peer_ips: HashMap::new(),
deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
message_delivery_time_callback: callback,
}
}
pub fn score(&self, peer_id: &PeerId) -> f64 {
let peer_stats = match self.peer_stats.get(peer_id) {
Some(v) => v,
None => return 0.0,
};
let mut score = 0.0;
for (topic, topic_stats) in peer_stats.topics.iter() {
if let Some(topic_params) = self.params.topics.get(topic) {
let mut topic_score = 0.0;
if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
let p1 = {
let v = mesh_time.as_secs_f64()
/ topic_params.time_in_mesh_quantum.as_secs_f64();
if v < topic_params.time_in_mesh_cap {
v
} else {
topic_params.time_in_mesh_cap
}
};
topic_score += p1 * topic_params.time_in_mesh_weight;
}
let p2 = {
let v = topic_stats.first_message_deliveries as f64;
if v < topic_params.first_message_deliveries_cap {
v
} else {
topic_params.first_message_deliveries_cap
}
};
topic_score += p2 * topic_params.first_message_deliveries_weight;
if topic_stats.mesh_message_deliveries_active
&& topic_stats.mesh_message_deliveries
< topic_params.mesh_message_deliveries_threshold
{
let deficit = topic_params.mesh_message_deliveries_threshold
- topic_stats.mesh_message_deliveries;
let p3 = deficit * deficit;
topic_score += p3 * topic_params.mesh_message_deliveries_weight;
debug!(
"The peer {} has a mesh message delivieries deficit of {} in topic\
{} and will get penalized by {}",
peer_id,
deficit,
topic,
p3 * topic_params.mesh_message_deliveries_weight
);
}
let p3b = topic_stats.mesh_failure_penalty;
topic_score += p3b * topic_params.mesh_failure_penalty_weight;
let p4 =
topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
topic_score += p4 * topic_params.invalid_message_deliveries_weight;
score += topic_score * topic_params.topic_weight;
}
}
if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap {
score = self.params.topic_score_cap;
}
let p5 = peer_stats.application_score;
score += p5 * self.params.app_specific_weight;
for ip in peer_stats.known_ips.iter() {
if self.params.ip_colocation_factor_whitelist.get(ip).is_some() {
continue;
}
if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold {
let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
let p6 = surplus * surplus;
debug!(
"The peer {} gets penalized because of too many peers with the ip {}. \
The surplus is {}. ",
peer_id, ip, surplus
);
score += p6 * self.params.ip_colocation_factor_weight;
}
}
}
if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
let p7 = excess * excess;
score += p7 * self.params.behaviour_penalty_weight;
}
score
}
pub fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
debug!(
"Behavioral penalty for peer {}, count = {}.",
peer_id, count
);
peer_stats.behaviour_penalty += count as f64;
}
}
fn remove_ips_for_peer(
peer_stats: &PeerStats,
peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
peer_id: &PeerId,
) {
for ip in peer_stats.known_ips.iter() {
if let Some(peer_set) = peer_ips.get_mut(ip) {
peer_set.remove(peer_id);
}
}
}
pub fn refresh_scores(&mut self) {
let now = Instant::now();
let params_ref = &self.params;
let peer_ips_ref = &mut self.peer_ips;
self.peer_stats.retain(|peer_id, peer_stats| {
if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
if now > expire {
Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
return false;
}
return true;
}
for (topic, topic_stats) in peer_stats.topics.iter_mut() {
if let Some(topic_params) = params_ref.topics.get(topic) {
topic_stats.first_message_deliveries *=
topic_params.first_message_deliveries_decay;
if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
topic_stats.first_message_deliveries = 0.0;
}
topic_stats.mesh_message_deliveries *=
topic_params.mesh_message_deliveries_decay;
if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
topic_stats.mesh_message_deliveries = 0.0;
}
topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
topic_stats.mesh_failure_penalty = 0.0;
}
topic_stats.invalid_message_deliveries *=
topic_params.invalid_message_deliveries_decay;
if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
topic_stats.invalid_message_deliveries = 0.0;
}
if let MeshStatus::Active {
ref mut mesh_time,
ref mut graft_time,
} = topic_stats.mesh_status
{
*mesh_time = now.duration_since(*graft_time);
if *mesh_time > topic_params.mesh_message_deliveries_activation {
topic_stats.mesh_message_deliveries_active = true;
}
}
}
}
peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
peer_stats.behaviour_penalty = 0.0;
}
true
});
}
pub fn add_peer(&mut self, peer_id: PeerId) {
let peer_stats = self.peer_stats.entry(peer_id).or_default();
peer_stats.status = ConnectionStatus::Connected;
}
pub fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
trace!("Add ip for peer {}, ip: {}", peer_id, ip);
let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default();
peer_stats.status = ConnectionStatus::Connected;
peer_stats.known_ips.insert(ip);
self.peer_ips
.entry(ip)
.or_insert_with(HashSet::new)
.insert(peer_id.clone());
}
pub fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
peer_stats.known_ips.remove(ip);
if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
trace!("Remove ip for peer {}, ip: {}", peer_id, ip);
peer_ids.remove(peer_id);
} else {
trace!(
"No entry in peer_ips for ip {} which should get removed for peer {}",
ip,
peer_id
);
}
} else {
trace!(
"No peer_stats for peer {} which should remove the ip {}",
peer_id,
ip
);
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) {
if self.score(peer_id) > 0f64 {
if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(peer_id.clone()) {
Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
entry.remove();
}
return;
}
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
for (topic, topic_stats) in peer_stats.topics.iter_mut() {
topic_stats.first_message_deliveries = 0f64;
if let Some(threshold) = self
.params
.topics
.get(topic)
.map(|param| param.mesh_message_deliveries_threshold)
{
if topic_stats.in_mesh()
&& topic_stats.mesh_message_deliveries_active
&& topic_stats.mesh_message_deliveries < threshold
{
let deficit = threshold - topic_stats.mesh_message_deliveries;
topic_stats.mesh_failure_penalty += deficit * deficit;
}
}
topic_stats.mesh_status = MeshStatus::InActive;
topic_stats.mesh_message_deliveries_active = false;
}
peer_stats.status = ConnectionStatus::Disconnected {
expire: Instant::now() + self.params.retain_score,
};
}
}
pub fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
let topic = topic.into();
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
topic_stats.mesh_status = MeshStatus::new_active();
topic_stats.mesh_message_deliveries_active = false;
}
}
}
pub fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
{
let threshold = self
.params
.topics
.get(&topic)
.expect("Topic must exist in order for there to be topic stats")
.mesh_message_deliveries_threshold;
if topic_stats.mesh_message_deliveries_active
&& topic_stats.mesh_message_deliveries < threshold
{
let deficit = threshold - topic_stats.mesh_message_deliveries;
topic_stats.mesh_failure_penalty += deficit * deficit;
}
topic_stats.mesh_message_deliveries_active = false;
topic_stats.mesh_status = MeshStatus::InActive;
}
}
}
pub fn validate_message(&mut self, _from: &PeerId, msg_id: &MessageId, topic_hash: &TopicHash) {
self.deliveries
.entry(msg_id.clone())
.or_insert_with(DeliveryRecord::default);
if let Some(callback) = self.message_delivery_time_callback {
if self
.peer_stats
.get(_from)
.and_then(|s| s.topics.get(topic_hash))
.map(|ts| ts.in_mesh())
.unwrap_or(false)
{
callback(_from, topic_hash, 0.0);
}
}
}
pub fn deliver_message(&mut self, from: &PeerId, msg_id: &MessageId, topic_hash: &TopicHash) {
self.mark_first_message_delivery(from, topic_hash);
let record = self
.deliveries
.entry(msg_id.clone())
.or_insert_with(DeliveryRecord::default);
if record.status != DeliveryStatus::Unknown {
warn!("Unexpected delivery trace: Message from {} was first seen {}s ago and has a delivery status {:?}", from, record.first_seen.elapsed().as_secs(), record.status);
return;
}
record.status = DeliveryStatus::Valid(Instant::now());
for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
if &peer != from {
self.mark_duplicate_message_delivery(&peer, topic_hash, None);
}
}
}
pub fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
debug!(
"Message from {} rejected because of ValidationError or SelfOrigin",
from
);
self.mark_invalid_message_delivery(from, topic_hash);
}
pub fn reject_message(
&mut self,
from: &PeerId,
msg_id: &MessageId,
topic_hash: &TopicHash,
reason: RejectReason,
) {
match reason {
RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
self.reject_invalid_message(from, topic_hash);
return;
}
RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
return;
}
_ => {}
}
let peers: Vec<_> = {
let mut record = self
.deliveries
.entry(msg_id.clone())
.or_insert_with(DeliveryRecord::default);
if record.status != DeliveryStatus::Unknown {
warn!("Unexpected delivery trace: Message from {} was first seen {}s ago and has a delivery status {:?}", from, record.first_seen.elapsed().as_secs(), record.status);
return;
}
if let RejectReason::ValidationIgnored = reason {
record.status = DeliveryStatus::Ignored;
record.peers.clear();
return;
}
record.status = DeliveryStatus::Invalid;
record.peers.drain().collect()
};
self.mark_invalid_message_delivery(from, topic_hash);
for peer_id in peers.iter() {
self.mark_invalid_message_delivery(peer_id, topic_hash)
}
}
pub fn duplicated_message(
&mut self,
from: &PeerId,
msg_id: &MessageId,
topic_hash: &TopicHash,
) {
let record = self
.deliveries
.entry(msg_id.clone())
.or_insert_with(DeliveryRecord::default);
if record.peers.get(from).is_some() {
return;
}
if let Some(callback) = self.message_delivery_time_callback {
let time = if let DeliveryStatus::Valid(validated) = record.status {
validated.elapsed().as_secs_f64()
} else {
0.0
};
if self
.peer_stats
.get(from)
.and_then(|s| s.topics.get(topic_hash))
.map(|ts| ts.in_mesh())
.unwrap_or(false)
{
callback(from, topic_hash, time);
}
}
match record.status {
DeliveryStatus::Unknown => {
record.peers.insert(from.clone());
}
DeliveryStatus::Valid(validated) => {
record.peers.insert(from.clone());
self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
}
DeliveryStatus::Invalid => {
self.mark_invalid_message_delivery(from, topic_hash);
}
DeliveryStatus::Ignored => {
}
}
}
pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
peer_stats.application_score = new_score;
true
} else {
false
}
}
pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
use hash_map::Entry::*;
match self.params.topics.entry(topic_hash.clone()) {
Occupied(mut entry) => {
let first_message_deliveries_cap = params.first_message_deliveries_cap;
let mesh_message_delivieries_cap = params.mesh_message_deliveries_cap;
let old_params = entry.insert(params);
if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
for stats in &mut self.peer_stats.values_mut() {
if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
if tstats.first_message_deliveries > first_message_deliveries_cap {
tstats.first_message_deliveries = first_message_deliveries_cap;
}
}
}
}
if old_params.mesh_message_deliveries_cap > mesh_message_delivieries_cap {
for stats in self.peer_stats.values_mut() {
if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
if tstats.mesh_message_deliveries > mesh_message_delivieries_cap {
tstats.mesh_message_deliveries = mesh_message_delivieries_cap;
}
}
}
}
}
Vacant(entry) => {
entry.insert(params);
}
}
}
fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
if let Some(topic_stats) =
peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
{
debug!(
"Peer {} delivered an invalid message in topic {} and gets penalized \
for it",
peer_id, topic_hash
);
topic_stats.invalid_message_deliveries += 1f64;
}
}
}
fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
if let Some(topic_stats) =
peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
{
let cap = self
.params
.topics
.get(topic_hash)
.expect("Topic must exist if there are known topic_stats")
.first_message_deliveries_cap;
topic_stats.first_message_deliveries =
if topic_stats.first_message_deliveries + 1f64 > cap {
cap
} else {
topic_stats.first_message_deliveries + 1f64
};
if let MeshStatus::Active { .. } = topic_stats.mesh_status {
let cap = self
.params
.topics
.get(topic_hash)
.expect("Topic must exist if there are known topic_stats")
.mesh_message_deliveries_cap;
topic_stats.mesh_message_deliveries =
if topic_stats.mesh_message_deliveries + 1f64 > cap {
cap
} else {
topic_stats.mesh_message_deliveries + 1f64
};
}
}
}
}
fn mark_duplicate_message_delivery(
&mut self,
peer_id: &PeerId,
topic_hash: &TopicHash,
validated_time: Option<Instant>,
) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
let now = if validated_time.is_some() {
Some(Instant::now())
} else {
None
};
if let Some(topic_stats) =
peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
{
if let MeshStatus::Active { .. } = topic_stats.mesh_status {
let topic_params = self
.params
.topics
.get(topic_hash)
.expect("Topic must exist if there are known topic_stats");
let mut falls_in_mesh_deliver_window = true;
if let Some(validated_time) = validated_time {
if let Some(now) = &now {
let window_time = validated_time
.checked_add(topic_params.mesh_message_deliveries_window)
.unwrap_or_else(|| *now);
if now > &window_time {
falls_in_mesh_deliver_window = false;
}
}
}
if falls_in_mesh_deliver_window {
let cap = topic_params.mesh_message_deliveries_cap;
topic_stats.mesh_message_deliveries =
if topic_stats.mesh_message_deliveries + 1f64 > cap {
cap
} else {
topic_stats.mesh_message_deliveries + 1f64
};
}
}
}
}
}
pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
self.peer_stats
.get(peer)
.and_then(|s| s.topics.get(topic))
.map(|t| t.mesh_message_deliveries)
}
}
#[derive(Clone, Copy)]
pub(crate) enum RejectReason {
ValidationError(ValidationError),
SelfOrigin,
BlackListedPeer,
BlackListedSource,
ValidationIgnored,
ValidationFailed,
}