use libp2p::PeerId;
use log::error;
use std::{
borrow::Cow,
collections::{HashMap, HashSet, hash_map::{Entry, OccupiedEntry}},
};
use wasm_timer::Instant;
#[derive(Debug, Clone)]
pub struct PeersState {
nodes: HashMap<PeerId, Node>,
sets: Vec<SetInfo>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct SetConfig {
pub in_peers: u32,
pub out_peers: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SetInfo {
num_in: u32,
num_out: u32,
max_in: u32,
max_out: u32,
no_slot_nodes: HashSet<PeerId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct Node {
sets: Vec<MembershipState>,
reputation: i32,
}
impl Node {
fn new(num_sets: usize) -> Node {
Node {
sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(),
reputation: 0,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum MembershipState {
NotMember,
In,
Out,
NotConnected {
last_connected: Instant,
},
}
impl MembershipState {
fn is_connected(self) -> bool {
match self {
MembershipState::NotMember => false,
MembershipState::In => true,
MembershipState::Out => true,
MembershipState::NotConnected { .. } => false,
}
}
}
impl PeersState {
pub fn new(sets: impl IntoIterator<Item = SetConfig>) -> Self {
PeersState {
nodes: HashMap::new(),
sets: sets
.into_iter()
.map(|config| SetInfo {
num_in: 0,
num_out: 0,
max_in: config.in_peers,
max_out: config.out_peers,
no_slot_nodes: HashSet::new(),
})
.collect(),
}
}
pub fn num_sets(&self) -> usize {
self.sets.len()
}
pub fn peer_reputation(&mut self, peer_id: PeerId) -> Reputation {
if !self.nodes.contains_key(&peer_id) {
self.nodes.insert(peer_id.clone(), Node::new(self.sets.len()));
}
let entry = match self.nodes.entry(peer_id) {
Entry::Vacant(_) => unreachable!("guaranteed to be inserted above; qed"),
Entry::Occupied(e) => e,
};
Reputation { node: Some(entry) }
}
pub fn peer<'a>(&'a mut self, set: usize, peer_id: &'a PeerId) -> Peer<'a> {
assert!(set < self.sets.len());
match self.nodes.get_mut(peer_id).map(|p| &p.sets[set]) {
None | Some(MembershipState::NotMember) => Peer::Unknown(UnknownPeer {
parent: self,
set,
peer_id: Cow::Borrowed(peer_id),
}),
Some(MembershipState::In) | Some(MembershipState::Out) => {
Peer::Connected(ConnectedPeer {
state: self,
set,
peer_id: Cow::Borrowed(peer_id),
})
}
Some(MembershipState::NotConnected { .. }) => Peer::NotConnected(NotConnectedPeer {
state: self,
set,
peer_id: Cow::Borrowed(peer_id),
}),
}
}
pub fn peers(&self) -> impl ExactSizeIterator<Item = &PeerId> {
self.nodes.keys()
}
pub fn connected_peers(&self, set: usize) -> impl Iterator<Item = &PeerId> {
assert!(set < self.sets.len());
self.nodes
.iter()
.filter(move |(_, p)| p.sets[set].is_connected())
.map(|(p, _)| p)
}
pub fn highest_not_connected_peer(&mut self, set: usize) -> Option<NotConnectedPeer> {
assert!(set < self.sets.len());
let outcome = self
.nodes
.iter_mut()
.filter(|(_, Node { sets, .. })| {
match sets[set] {
MembershipState::NotMember => false,
MembershipState::In => false,
MembershipState::Out => false,
MembershipState::NotConnected { .. } => true,
}
})
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
if let Some(cur_node) = cur_node.take() {
if cur_node.1.reputation >= to_try.1.reputation {
return Some(cur_node);
}
}
Some(to_try)
})
.map(|(peer_id, _)| peer_id.clone());
if let Some(peer_id) = outcome {
Some(NotConnectedPeer {
state: self,
set,
peer_id: Cow::Owned(peer_id),
})
} else {
None
}
}
pub fn add_no_slot_node(&mut self, set: usize, peer_id: PeerId) {
if !self.sets[set].no_slot_nodes.insert(peer_id.clone()) {
return;
}
if let Some(peer) = self.nodes.get_mut(&peer_id) {
match peer.sets[set] {
MembershipState::In => self.sets[set].num_in -= 1,
MembershipState::Out => self.sets[set].num_out -= 1,
MembershipState::NotConnected { .. } | MembershipState::NotMember => {}
}
}
}
pub fn remove_no_slot_node(&mut self, set: usize, peer_id: &PeerId) {
if !self.sets[set].no_slot_nodes.remove(peer_id) {
return;
}
if let Some(peer) = self.nodes.get_mut(peer_id) {
match peer.sets[set] {
MembershipState::In => self.sets[set].num_in += 1,
MembershipState::Out => self.sets[set].num_out += 1,
MembershipState::NotConnected { .. } | MembershipState::NotMember => {}
}
}
}
}
pub enum Peer<'a> {
Connected(ConnectedPeer<'a>),
NotConnected(NotConnectedPeer<'a>),
Unknown(UnknownPeer<'a>),
}
impl<'a> Peer<'a> {
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
match self {
Peer::Connected(peer) => Some(peer),
Peer::NotConnected(_) => None,
Peer::Unknown(_) => None,
}
}
#[cfg(test)]
pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(peer) => Some(peer),
Peer::Unknown(_) => None,
}
}
#[cfg(test)]
pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(_) => None,
Peer::Unknown(peer) => Some(peer),
}
}
}
pub struct ConnectedPeer<'a> {
state: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> ConnectedPeer<'a> {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn into_peer_id(self) -> PeerId {
self.peer_id.into_owned()
}
pub fn disconnect(self) -> NotConnectedPeer<'a> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
if !is_no_slot_occupy {
match node.sets[self.set] {
MembershipState::In => self.state.sets[self.set].num_in -= 1,
MembershipState::Out => self.state.sets[self.set].num_out -= 1,
MembershipState::NotMember | MembershipState::NotConnected { .. } => {
debug_assert!(
false,
"State inconsistency: disconnecting a disconnected node"
)
}
}
}
node.sets[self.set] = MembershipState::NotConnected {
last_connected: Instant::now(),
};
} else {
debug_assert!(
false,
"State inconsistency: disconnecting a disconnected node"
);
}
NotConnectedPeer {
state: self.state,
set: self.set,
peer_id: self.peer_id,
}
}
pub fn add_reputation(&mut self, modifier: i32) {
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
node.reputation = node.reputation.saturating_add(modifier);
} else {
debug_assert!(
false,
"State inconsistency: add_reputation on an unknown node"
);
}
}
pub fn reputation(&self) -> i32 {
self.state
.nodes
.get(&*self.peer_id)
.map_or(0, |p| p.reputation)
}
}
#[derive(Debug)]
pub struct NotConnectedPeer<'a> {
state: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> NotConnectedPeer<'a> {
pub fn into_peer_id(self) -> PeerId {
self.peer_id.into_owned()
}
pub fn bump_last_connected_or_discovered(&mut self) {
let state = match self.state.nodes.get_mut(&*self.peer_id) {
Some(s) => s,
None => return,
};
if let MembershipState::NotConnected { last_connected } = &mut state.sets[self.set] {
*last_connected = Instant::now();
}
}
pub fn last_connected_or_discovered(&self) -> Instant {
let state = match self.state.nodes.get(&*self.peer_id) {
Some(s) => s,
None => {
error!(
target: "peerset",
"State inconsistency with {}; not connected after borrow",
self.peer_id
);
return Instant::now();
}
};
match state.sets[self.set] {
MembershipState::NotConnected { last_connected } => last_connected,
_ => {
error!(target: "peerset", "State inconsistency with {}", self.peer_id);
Instant::now()
}
}
}
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if self.state.sets[self.set].num_out >= self.state.sets[self.set].max_out
&& !is_no_slot_occupy
{
return Err(self);
}
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
peer.sets[self.set] = MembershipState::Out;
if !is_no_slot_occupy {
self.state.sets[self.set].num_out += 1;
}
} else {
debug_assert!(
false,
"State inconsistency: try_outgoing on an unknown node"
);
}
Ok(ConnectedPeer {
state: self.state,
set: self.set,
peer_id: self.peer_id,
})
}
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if self.state.sets[self.set].num_in >= self.state.sets[self.set].max_in
&& !is_no_slot_occupy
{
return Err(self);
}
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
peer.sets[self.set] = MembershipState::In;
if !is_no_slot_occupy {
self.state.sets[self.set].num_in += 1;
}
} else {
debug_assert!(
false,
"State inconsistency: try_accept_incoming on an unknown node"
);
}
Ok(ConnectedPeer {
state: self.state,
set: self.set,
peer_id: self.peer_id,
})
}
pub fn reputation(&self) -> i32 {
self.state
.nodes
.get(&*self.peer_id)
.map_or(0, |p| p.reputation)
}
#[cfg(test)]
pub fn set_reputation(&mut self, value: i32) {
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
node.reputation = value;
} else {
debug_assert!(
false,
"State inconsistency: set_reputation on an unknown node"
);
}
}
pub fn forget_peer(self) -> UnknownPeer<'a> {
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
debug_assert!(!matches!(peer.sets[self.set], MembershipState::NotMember));
peer.sets[self.set] = MembershipState::NotMember;
if peer.reputation == 0 && peer
.sets
.iter()
.all(|set| matches!(set, MembershipState::NotMember))
{
self.state.nodes.remove(&*self.peer_id);
}
} else {
debug_assert!(false, "State inconsistency: forget_peer on an unknown node");
error!(
target: "peerset",
"State inconsistency with {} when forgetting peer",
self.peer_id
);
};
UnknownPeer {
parent: self.state,
set: self.set,
peer_id: self.peer_id,
}
}
}
pub struct UnknownPeer<'a> {
parent: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> UnknownPeer<'a> {
pub fn discover(self) -> NotConnectedPeer<'a> {
let num_sets = self.parent.sets.len();
self.parent
.nodes
.entry(self.peer_id.clone().into_owned())
.or_insert_with(|| Node::new(num_sets))
.sets[self.set] = MembershipState::NotConnected {
last_connected: Instant::now(),
};
NotConnectedPeer {
state: self.parent,
set: self.set,
peer_id: self.peer_id,
}
}
}
pub struct Reputation<'a> {
node: Option<OccupiedEntry<'a, PeerId, Node>>,
}
impl<'a> Reputation<'a> {
pub fn reputation(&self) -> i32 {
self.node.as_ref().unwrap().get().reputation
}
pub fn set_reputation(&mut self, value: i32) {
self.node.as_mut().unwrap().get_mut().reputation = value;
}
pub fn add_reputation(&mut self, modifier: i32) {
let reputation = &mut self.node.as_mut().unwrap().get_mut().reputation;
*reputation = reputation.saturating_add(modifier);
}
}
impl<'a> Drop for Reputation<'a> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
if node.get().reputation == 0 &&
node.get().sets.iter().all(|set| matches!(set, MembershipState::NotMember))
{
node.remove();
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Peer, PeersState, SetConfig};
use libp2p::PeerId;
use std::iter;
#[test]
fn full_slots_in() {
let mut peers_state = PeersState::new(iter::once(SetConfig {
in_peers: 1,
out_peers: 1,
}));
let id1 = PeerId::random();
let id2 = PeerId::random();
if let Peer::Unknown(e) = peers_state.peer(0, &id1) {
assert!(e.discover().try_accept_incoming().is_ok());
}
if let Peer::Unknown(e) = peers_state.peer(0, &id2) {
assert!(e.discover().try_accept_incoming().is_err());
}
}
#[test]
fn no_slot_node_doesnt_use_slot() {
let mut peers_state = PeersState::new(iter::once(SetConfig {
in_peers: 1,
out_peers: 1,
}));
let id1 = PeerId::random();
let id2 = PeerId::random();
peers_state.add_no_slot_node(0, id1.clone());
if let Peer::Unknown(p) = peers_state.peer(0, &id1) {
assert!(p.discover().try_accept_incoming().is_ok());
} else {
panic!()
}
if let Peer::Unknown(e) = peers_state.peer(0, &id2) {
assert!(e.discover().try_accept_incoming().is_ok());
} else {
panic!()
}
}
#[test]
fn disconnecting_frees_slot() {
let mut peers_state = PeersState::new(iter::once(SetConfig {
in_peers: 1,
out_peers: 1,
}));
let id1 = PeerId::random();
let id2 = PeerId::random();
assert!(peers_state
.peer(0, &id1)
.into_unknown()
.unwrap()
.discover()
.try_accept_incoming()
.is_ok());
assert!(peers_state
.peer(0, &id2)
.into_unknown()
.unwrap()
.discover()
.try_accept_incoming()
.is_err());
peers_state
.peer(0, &id1)
.into_connected()
.unwrap()
.disconnect();
assert!(peers_state
.peer(0, &id2)
.into_not_connected()
.unwrap()
.try_accept_incoming()
.is_ok());
}
#[test]
fn highest_not_connected_peer() {
let mut peers_state = PeersState::new(iter::once(SetConfig {
in_peers: 25,
out_peers: 25,
}));
let id1 = PeerId::random();
let id2 = PeerId::random();
assert!(peers_state.highest_not_connected_peer(0).is_none());
peers_state
.peer(0, &id1)
.into_unknown()
.unwrap()
.discover()
.set_reputation(50);
peers_state
.peer(0, &id2)
.into_unknown()
.unwrap()
.discover()
.set_reputation(25);
assert_eq!(
peers_state
.highest_not_connected_peer(0)
.map(|p| p.into_peer_id()),
Some(id1.clone())
);
peers_state
.peer(0, &id2)
.into_not_connected()
.unwrap()
.set_reputation(75);
assert_eq!(
peers_state
.highest_not_connected_peer(0)
.map(|p| p.into_peer_id()),
Some(id2.clone())
);
peers_state
.peer(0, &id2)
.into_not_connected()
.unwrap()
.try_accept_incoming()
.unwrap();
assert_eq!(
peers_state
.highest_not_connected_peer(0)
.map(|p| p.into_peer_id()),
Some(id1.clone())
);
peers_state
.peer(0, &id1)
.into_not_connected()
.unwrap()
.set_reputation(100);
peers_state
.peer(0, &id2)
.into_connected()
.unwrap()
.disconnect();
assert_eq!(
peers_state
.highest_not_connected_peer(0)
.map(|p| p.into_peer_id()),
Some(id1.clone())
);
peers_state
.peer(0, &id1)
.into_not_connected()
.unwrap()
.set_reputation(-100);
assert_eq!(
peers_state
.highest_not_connected_peer(0)
.map(|p| p.into_peer_id()),
Some(id2.clone())
);
}
#[test]
fn disconnect_no_slot_doesnt_panic() {
let mut peers_state = PeersState::new(iter::once(SetConfig {
in_peers: 1,
out_peers: 1,
}));
let id = PeerId::random();
peers_state.add_no_slot_node(0, id.clone());
let peer = peers_state
.peer(0, &id)
.into_unknown()
.unwrap()
.discover()
.try_outgoing()
.unwrap();
peer.disconnect();
}
}