use crate::config::ProtocolId;
use crate::protocol::generic_proto::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn},
upgrade::RegisteredProtocol
};
use bytes::BytesMut;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p::swarm::{
DialPeerCondition,
NetworkBehaviour,
NetworkBehaviourAction,
NotifyHandler,
PollParameters
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}};
use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration};
use wasm_timer::Instant;
pub struct GenericProto {
legacy_protocol: RegisteredProtocol,
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,
peerset: sc_peerset::Peerset,
peers: FnvHashMap<(PeerId, sc_peerset::SetId), PeerState>,
delays: stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId, sc_peerset::SetId)> + Send>>>,
next_delay_id: DelayId,
incoming: SmallVec<[IncomingPeer; 6]>,
next_incoming_index: sc_peerset::IncomingIndex,
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);
#[derive(Debug)]
enum PeerState {
Poisoned,
Backoff {
timer: DelayId,
timer_deadline: Instant,
},
PendingRequest {
timer: DelayId,
timer_deadline: Instant,
},
Requested,
Disabled {
backoff_until: Option<Instant>,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
DisabledPendingEnable {
timer: DelayId,
timer_deadline: Instant,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
Enabled {
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
Incoming {
backoff_until: Option<Instant>,
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
}
impl PeerState {
fn is_open(&self) -> bool {
self.get_open().is_some()
}
fn get_open(&self) -> Option<&NotificationsSink> {
match self {
PeerState::Enabled { connections, .. } => connections
.iter()
.filter_map(|(_, s)| match s {
ConnectionState::Open(s) => Some(s),
_ => None,
})
.next(),
PeerState::Poisoned => None,
PeerState::Backoff { .. } => None,
PeerState::PendingRequest { .. } => None,
PeerState::Requested => None,
PeerState::Disabled { .. } => None,
PeerState::DisabledPendingEnable { .. } => None,
PeerState::Incoming { .. } => None,
}
}
fn is_requested(&self) -> bool {
match self {
PeerState::Poisoned => false,
PeerState::Backoff { .. } => false,
PeerState::PendingRequest { .. } => true,
PeerState::Requested => true,
PeerState::Disabled { .. } => false,
PeerState::DisabledPendingEnable { .. } => true,
PeerState::Enabled { .. } => true,
PeerState::Incoming { .. } => false,
}
}
}
#[derive(Debug)]
enum ConnectionState {
Closed,
Closing,
Opening,
OpeningThenClosing,
OpenDesiredByRemote,
Open(NotificationsSink),
}
#[derive(Debug)]
struct IncomingPeer {
peer_id: PeerId,
set_id: sc_peerset::SetId,
alive: bool,
incoming_id: sc_peerset::IncomingIndex,
}
#[derive(Debug)]
pub enum GenericProtoOut {
CustomProtocolOpen {
peer_id: PeerId,
set_id: sc_peerset::SetId,
received_handshake: Vec<u8>,
notifications_sink: NotificationsSink,
},
CustomProtocolReplaced {
peer_id: PeerId,
set_id: sc_peerset::SetId,
notifications_sink: NotificationsSink,
},
CustomProtocolClosed {
peer_id: PeerId,
set_id: sc_peerset::SetId,
},
LegacyMessage {
peer_id: PeerId,
message: BytesMut,
},
Notification {
peer_id: PeerId,
set_id: sc_peerset::SetId,
message: BytesMut,
},
}
impl GenericProto {
pub fn new(
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.collect::<Vec<_>>();
assert!(!notif_protocols.is_empty());
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);
GenericProto {
legacy_protocol,
notif_protocols,
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
next_delay_id: DelayId(0),
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: VecDeque::new(),
}
}
pub fn set_notif_protocol_handshake(
&mut self,
set_id: sc_peerset::SetId,
handshake_message: impl Into<Vec<u8>>
) {
if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
*p.1.write() = handshake_message.into();
} else {
log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
debug_assert!(false);
}
}
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
}
pub fn open_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id)
}
pub fn is_open(&self, peer_id: &PeerId, set_id: sc_peerset::SetId) -> bool {
self.peers.get(&(peer_id.clone(), set_id)).map(|p| p.is_open()).unwrap_or(false)
}
pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: sc_peerset::SetId) {
debug!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id);
self.disconnect_peer_inner(peer_id, set_id, None);
}
fn disconnect_peer_inner(
&mut self,
peer_id: &PeerId,
set_id: sc_peerset::SetId,
ban: Option<Duration>
) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id.clone(), set_id)) {
entry
} else {
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
st @ PeerState::Requested => *entry.into_mut() = st,
st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
st @ PeerState::Backoff { .. } => *entry.into_mut() = st,
PeerState::DisabledPendingEnable {
connections,
timer_deadline,
timer: _
} => {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
let backoff_until = Some(if let Some(ban) = ban {
cmp::max(timer_deadline, Instant::now() + ban)
} else {
timer_deadline
});
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until
}
},
PeerState::Enabled { mut connections } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
debug!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(),
set_id,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::Opening))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::OpeningThenClosing;
}
debug_assert!(!connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))));
debug_assert!(!connections.iter().any(|(_, s)| matches!(s, ConnectionState::Opening)));
let backoff_until = ban.map(|dur| Instant::now() + dur);
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until
}
},
PeerState::Incoming { mut connections, backoff_until } => {
let inc = if let Some(inc) = self.incoming.iter_mut()
.find(|i| i.peer_id == entry.key().0 && i.set_id == set_id && i.alive) {
inc
} else {
error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
incoming for incoming peer");
return
};
inc.alive = false;
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
let backoff_until = match (backoff_until, ban) {
(Some(a), Some(b)) => Some(cmp::max(a, Instant::now() + b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(Instant::now() + b),
(None, None) => None,
};
debug_assert!(!connections.iter().any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until
}
},
PeerState::Poisoned =>
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id),
}
}
pub fn requested_peers<'a>(&'a self, set_id: sc_peerset::SetId) -> impl Iterator<Item = &'a PeerId> + 'a {
self.peers.iter()
.filter(move |((_, set), state)| *set == set_id && state.is_requested())
.map(|((id, _), _)| id)
}
pub fn write_notification(
&mut self,
target: &PeerId,
set_id: sc_peerset::SetId,
message: impl Into<Vec<u8>>,
) {
let notifs_sink = match self.peers.get(&(target.clone(), set_id)).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent notification to {:?} without an open channel.",
target);
return
},
Some(sink) => sink
};
let message = message.into();
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
target,
set_id,
message.len(),
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
notifs_sink.send_sync_notification(message);
}
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.peerset.debug_info()
}
fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) {
let mut occ_entry = match self.peers.entry((peer_id.clone(), set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Starting to connect",
entry.key().0, set_id);
debug!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0);
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: entry.key().0.clone(),
condition: DialPeerCondition::Disconnected
});
entry.insert(PeerState::Requested);
return;
}
};
let now = Instant::now();
match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Backoff { ref timer, ref timer_deadline } if *timer_deadline > now => {
let peer_id = occ_entry.key().0.clone();
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Will start to connect at \
until {:?}", peer_id, set_id, timer_deadline);
*occ_entry.into_mut() = PeerState::PendingRequest {
timer: *timer,
timer_deadline: *timer_deadline,
};
},
PeerState::Backoff { .. } => {
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Starting to connect",
occ_entry.key().0, set_id);
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: occ_entry.key().0.clone(),
condition: DialPeerCondition::Disconnected
});
*occ_entry.into_mut() = PeerState::Requested;
},
PeerState::Disabled {
connections,
backoff_until: Some(ref backoff)
} if *backoff > now => {
let peer_id = occ_entry.key().0.clone();
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): But peer is backed-off until {:?}",
peer_id, set_id, backoff);
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*backoff - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
timer_deadline: *backoff,
};
},
PeerState::Disabled { mut connections, backoff_until } => {
debug_assert!(!connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::Open(_))
}));
if let Some((connec_id, connec_state)) = connections.iter_mut()
.find(|(_, s)| matches!(s, ConnectionState::Closed))
{
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
occ_entry.key().0, set_id);
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*occ_entry.into_mut() = PeerState::Enabled { connections };
} else {
debug_assert!(connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::OpeningThenClosing | ConnectionState::Closing)
}));
debug!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): No connection in proper state. Delaying.",
occ_entry.key().0, set_id
);
let timer_deadline = {
let base = now + Duration::from_secs(5);
if let Some(backoff_until) = backoff_until {
cmp::max(base, backoff_until)
} else {
base
}
};
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
debug_assert!(timer_deadline > now);
let delay = futures_timer::Delay::new(timer_deadline - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
timer_deadline,
};
}
},
PeerState::Incoming { mut connections, .. } => {
debug!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
occ_entry.key().0, set_id);
if let Some(inc) = self.incoming.iter_mut()
.find(|i| i.peer_id == occ_entry.key().0 && i.set_id == set_id && i.alive) {
inc.alive = false;
} else {
error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
incoming for incoming peer")
}
debug_assert!(connections.iter().any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
occ_entry.key(), *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: occ_entry.key().0.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
}
*occ_entry.into_mut() = PeerState::Enabled { connections };
},
st @ PeerState::Enabled { .. } => {
warn!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already connected.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
st @ PeerState::DisabledPendingEnable { .. } => {
warn!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already pending enabling.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
warn!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Duplicate request.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", occ_entry.key());
debug_assert!(false);
},
}
}
fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) {
let mut entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
debug!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
entry.key().0, set_id);
return
}
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Disabled { .. } | st @ PeerState::Backoff { .. } => {
debug!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Already disabled.",
entry.key().0, set_id);
*entry.into_mut() = st;
},
PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => {
debug_assert!(!connections.is_empty());
debug!(target: "sub-libp2p",
"PSM => Drop({}, {:?}): Interrupting pending enabling.",
entry.key().0, set_id);
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until: Some(timer_deadline),
};
},
PeerState::Enabled { mut connections } => {
debug!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Disabling connections.",
entry.key().0, set_id);
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
debug!(target: "sub-libp2p", "External API <= Closed({}, {:?})", entry.key().0, set_id);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: entry.key().0.clone(),
set_id,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::Opening))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
entry.key(), *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: entry.key().0.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::OpeningThenClosing;
}
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
entry.key(), *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: entry.key().0.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
},
PeerState::Requested => {
debug!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected.",
entry.key().0, set_id);
entry.remove();
},
PeerState::PendingRequest { timer, timer_deadline } => {
debug!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected",
entry.key().0, set_id);
*entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
},
st @ PeerState::Incoming { .. } => {
error!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not enabled (Incoming).",
entry.key().0, set_id);
*entry.into_mut() = st;
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
debug_assert!(false);
},
}
}
fn peerset_report_accept(&mut self, index: sc_peerset::IncomingIndex) {
let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
self.incoming.remove(pos)
} else {
error!(target: "sub-libp2p", "PSM => Accept({:?}): Invalid index", index);
return
};
if !incoming.alive {
debug!(target: "sub-libp2p", "PSM => Accept({:?}, {}, {:?}): Obsolete incoming",
index, incoming.peer_id, incoming.set_id);
match self.peers.get_mut(&(incoming.peer_id.clone(), incoming.set_id)) {
Some(PeerState::DisabledPendingEnable { .. }) |
Some(PeerState::Enabled { .. }) => {}
_ => {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})",
incoming.peer_id, incoming.set_id);
self.peerset.dropped(incoming.set_id, incoming.peer_id, sc_peerset::DropReason::Unknown);
},
}
return
}
let state = match self.peers.get_mut(&(incoming.peer_id.clone(), incoming.set_id)) {
Some(s) => s,
None => {
debug_assert!(false);
return;
}
};
match mem::replace(state, PeerState::Poisoned) {
PeerState::Incoming { mut connections, .. } => {
debug!(target: "sub-libp2p", "PSM => Accept({:?}, {}, {:?}): Enabling connections.",
index, incoming.peer_id, incoming.set_id);
debug_assert!(connections.iter().any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
incoming.peer_id, *connec_id, incoming.set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: incoming.peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
});
*connec_state = ConnectionState::Opening;
}
*state = PeerState::Enabled { connections };
}
peer => {
error!(target: "sub-libp2p",
"State mismatch in libp2p: Expected alive incoming. Got {:?}.",
peer);
debug_assert!(false);
}
}
}
fn peerset_report_reject(&mut self, index: sc_peerset::IncomingIndex) {
let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) {
self.incoming.remove(pos)
} else {
error!(target: "sub-libp2p", "PSM => Reject({:?}): Invalid index", index);
return
};
if !incoming.alive {
debug!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Obsolete incoming, \
ignoring", index, incoming.peer_id, incoming.set_id);
return
}
let state = match self.peers.get_mut(&(incoming.peer_id.clone(), incoming.set_id)) {
Some(s) => s,
None => {
debug_assert!(false);
return;
}
};
match mem::replace(state, PeerState::Poisoned) {
PeerState::Incoming { mut connections, backoff_until } => {
debug!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
index, incoming.peer_id, incoming.set_id);
debug_assert!(connections.iter().any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})",
incoming.peer_id, connec_id, incoming.set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: incoming.peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() },
});
*connec_state = ConnectionState::Closing;
}
*state = PeerState::Disabled { connections, backoff_until };
}
peer => error!(target: "sub-libp2p",
"State mismatch in libp2p: Expected alive incoming. Got {:?}.",
peer)
}
}
}
impl NetworkBehaviour for GenericProto {
type ProtocolsHandler = NotifsHandlerProto;
type OutEvent = GenericProtoOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: &PeerId) {
}
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
match self.peers.entry((peer_id.clone(), set_id)).or_insert(PeerState::Poisoned) {
st @ &mut PeerState::Requested |
st @ &mut PeerState::PendingRequest { .. } => {
debug!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}): Connection was requested by PSM.",
peer_id, set_id, endpoint
);
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *conn, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*conn),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
let mut connections = SmallVec::new();
connections.push((*conn, ConnectionState::Opening));
*st = PeerState::Enabled { connections };
}
st @ &mut PeerState::Poisoned |
st @ &mut PeerState::Backoff { .. } => {
let backoff_until = if let PeerState::Backoff { timer_deadline, .. } = st {
Some(*timer_deadline)
} else {
None
};
debug!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}, {:?}): Not requested by PSM, disabling.",
peer_id, set_id, endpoint, *conn);
let mut connections = SmallVec::new();
connections.push((*conn, ConnectionState::Closed));
*st = PeerState::Disabled { connections, backoff_until };
}
PeerState::Incoming { connections, .. } |
PeerState::Disabled { connections, .. } |
PeerState::DisabledPendingEnable { connections, .. } |
PeerState::Enabled { connections, .. } => {
debug!(target: "sub-libp2p",
"Libp2p => Connected({}, {:?}, {:?}, {:?}): Secondary connection. Leaving closed.",
peer_id, set_id, endpoint, *conn);
connections.push((*conn, ConnectionState::Closed));
}
}
}
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _endpoint: &ConnectedPoint) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id.clone(), set_id)) {
entry
} else {
error!(target: "sub-libp2p", "inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Disabled { mut connections, backoff_until } => {
debug!(target: "sub-libp2p", "Libp2p => Disconnected({}, {:?}, {:?}): Disabled.",
peer_id, set_id, *conn);
if let Some(pos) = connections.iter().position(|(c, _)| *c == *conn) {
connections.remove(pos);
} else {
debug_assert!(false);
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
}
if connections.is_empty() {
if let Some(until) = backoff_until {
let now = Instant::now();
if until > now {
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(until - now);
let peer_id = peer_id.clone();
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: until,
};
} else {
entry.remove();
}
} else {
entry.remove();
}
} else {
*entry.get_mut() = PeerState::Disabled { connections, backoff_until };
}
},
PeerState::DisabledPendingEnable { mut connections, timer_deadline, timer } => {
debug!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): Disabled but pending enable.",
peer_id, set_id, *conn
);
if let Some(pos) = connections.iter().position(|(c, _)| *c == *conn) {
connections.remove(pos);
} else {
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
}
if connections.is_empty() {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
} else {
*entry.get_mut() = PeerState::DisabledPendingEnable {
connections, timer_deadline, timer
};
}
},
PeerState::Incoming { mut connections, backoff_until } => {
debug!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
peer_id, set_id, *conn
);
debug_assert!(connections.iter().any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
if let Some(pos) = connections.iter().position(|(c, _)| *c == *conn) {
connections.remove(pos);
} else {
debug_assert!(false);
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
}
let no_desired_left = !connections.iter().any(|(_, s)| {
matches!(s, ConnectionState::OpenDesiredByRemote)
});
if no_desired_left {
if let Some(state) = self.incoming.iter_mut()
.find(|i| i.alive && i.set_id == set_id && i.peer_id == *peer_id)
{
state.alive = false;
} else {
error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in \
incoming corresponding to an incoming state in peers");
debug_assert!(false);
}
}
if connections.is_empty() {
if let Some(until) = backoff_until {
let now = Instant::now();
if until > now {
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(until - now);
let peer_id = peer_id.clone();
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: until,
};
} else {
entry.remove();
}
} else {
entry.remove();
}
} else if no_desired_left {
*entry.get_mut() = PeerState::Disabled { connections, backoff_until };
} else {
*entry.get_mut() = PeerState::Incoming { connections, backoff_until };
}
}
PeerState::Enabled { mut connections } => {
debug!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): Enabled.",
peer_id, set_id, *conn
);
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
if let Some(pos) = connections.iter().position(|(c, _)| *c == *conn) {
let (_, state) = connections.remove(pos);
if let ConnectionState::Open(_) = state {
if let Some((replacement_pos, replacement_sink)) = connections
.iter()
.enumerate()
.filter_map(|(num, (_, s))| {
match s {
ConnectionState::Open(s) => Some((num, s.clone())),
_ => None
}
})
.next()
{
if pos <= replacement_pos {
debug!(
target: "sub-libp2p",
"External API <= Sink replaced({}, {:?})",
peer_id, set_id
);
let event = GenericProtoOut::CustomProtocolReplaced {
peer_id: peer_id.clone(),
set_id,
notifications_sink: replacement_sink,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
} else {
debug!(
target: "sub-libp2p", "External API <= Closed({}, {:?})",
peer_id, set_id
);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(),
set_id,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
} else {
error!(target: "sub-libp2p",
"inject_connection_closed: State mismatch in the custom protos handler");
debug_assert!(false);
}
if connections.is_empty() {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(Duration::from_secs(ban_dur));
let peer_id = peer_id.clone();
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
};
} else if !connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
*entry.get_mut() = PeerState::Disabled {
connections,
backoff_until: None
};
} else {
*entry.get_mut() = PeerState::Enabled { connections };
}
}
PeerState::Requested |
PeerState::PendingRequest { .. } |
PeerState::Backoff { .. } => {
error!(target: "sub-libp2p",
"`inject_connection_closed` called for unknown peer {}",
peer_id);
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id);
debug_assert!(false);
},
}
}
}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn error::Error) {
trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id.clone(), set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
st @ PeerState::Backoff { .. } => {
*entry.into_mut() = st;
},
st @ PeerState::Requested |
st @ PeerState::PendingRequest { .. } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id.clone(), sc_peerset::DropReason::Unknown);
let now = Instant::now();
let ban_duration = match st {
PeerState::PendingRequest { timer_deadline, .. } if timer_deadline > now =>
cmp::max(timer_deadline - now, Duration::from_secs(5)),
_ => Duration::from_secs(5)
};
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(ban_duration);
let peer_id = peer_id.clone();
self.delays.push(async move {
delay.await;
(delay_id, peer_id, set_id)
}.boxed());
*entry.into_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: now + ban_duration,
};
},
st @ PeerState::Disabled { .. } | st @ PeerState::Enabled { .. } |
st @ PeerState::DisabledPendingEnable { .. } | st @ PeerState::Incoming { .. } => {
*entry.into_mut() = st;
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
debug_assert!(false);
},
}
}
}
}
fn inject_event(
&mut self,
source: PeerId,
connection: ConnectionId,
event: NotifsHandlerOut,
) {
match event {
NotifsHandlerOut::OpenDesiredByRemote { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index);
debug!(target: "sub-libp2p",
"Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
source, connection, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((source.clone(), set_id)) {
entry
} else {
error!(target: "sub-libp2p", "OpenDesiredByRemote: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Incoming { mut connections, backoff_until } => {
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::OpenDesiredByRemote)));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, _)| *c == connection) {
if let ConnectionState::Closed = *connec_state {
*connec_state = ConnectionState::OpenDesiredByRemote;
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing
));
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
*entry.into_mut() = PeerState::Incoming { connections, backoff_until };
},
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, _)| *c == connection) {
if let ConnectionState::Closed = *connec_state {
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
source, connection, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpenDesiredByRemote | ConnectionState::Opening
));
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
*entry.into_mut() = PeerState::Enabled { connections };
},
PeerState::Disabled { mut connections, backoff_until } => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, _)| *c == connection) {
if let ConnectionState::Closed = *connec_state {
*connec_state = ConnectionState::OpenDesiredByRemote;
let incoming_id = self.next_incoming_index;
self.next_incoming_index.0 += 1;
debug!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}).",
source, incoming_id);
self.peerset.incoming(set_id, source.clone(), incoming_id);
self.incoming.push(IncomingPeer {
peer_id: source.clone(),
set_id,
alive: true,
incoming_id,
});
*entry.into_mut() = PeerState::Incoming { connections, backoff_until };
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing
));
*entry.into_mut() = PeerState::Disabled { connections, backoff_until };
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
}
PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, _)| *c == connection) {
if let ConnectionState::Closed = *connec_state {
debug!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
source, connection, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source.clone(),
handler: NotifyHandler::One(connection),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*entry.into_mut() = PeerState::Enabled { connections };
} else {
debug_assert!(matches!(
connec_state,
ConnectionState::OpeningThenClosing
));
*entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer,
timer_deadline,
};
}
} else {
error!(
target: "sub-libp2p",
"OpenDesiredByRemote: State mismatch in the custom protos handler"
);
debug_assert!(false);
}
}
state => {
error!(target: "sub-libp2p",
"OpenDesiredByRemote: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
return
}
};
}
NotifsHandlerOut::CloseDesired { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index);
debug!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseDesired({:?})",
source, connection, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((source.clone(), set_id)) {
entry
} else {
error!(target: "sub-libp2p", "CloseDesired: State mismatch in the custom protos handler");
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
let pos = if let Some(pos) = connections.iter().position(|(c, _)| *c == connection) {
pos
} else {
error!(target: "sub-libp2p",
"CloseDesired: State mismatch in the custom protos handler");
debug_assert!(false);
return;
};
if matches!(connections[pos].1, ConnectionState::Closing) {
*entry.into_mut() = PeerState::Enabled { connections };
return;
}
debug_assert!(matches!(connections[pos].1, ConnectionState::Open(_)));
connections[pos].1 = ConnectionState::Closing;
debug!(target: "sub-libp2p", "Handler({}, {:?}) <= Close({:?})", source, connection, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source.clone(),
handler: NotifyHandler::One(connection),
event: NotifsHandlerIn::Close { protocol_index: set_id.into() },
});
if let Some((replacement_pos, replacement_sink)) = connections
.iter()
.enumerate()
.filter_map(|(num, (_, s))| {
match s {
ConnectionState::Open(s) => Some((num, s.clone())),
_ => None
}
})
.next()
{
if pos <= replacement_pos {
debug!(target: "sub-libp2p", "External API <= Sink replaced({:?})", source);
let event = GenericProtoOut::CustomProtocolReplaced {
peer_id: source,
set_id,
notifications_sink: replacement_sink,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
*entry.into_mut() = PeerState::Enabled { connections };
} else {
if !connections.iter().any(|(_, s)| matches!(s, ConnectionState::Opening)) {
debug!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", source, set_id);
self.peerset.dropped(set_id, source.clone(), sc_peerset::DropReason::Refused);
*entry.into_mut() = PeerState::Disabled {
connections, backoff_until: None
};
} else {
*entry.into_mut() = PeerState::Enabled { connections };
}
debug!(target: "sub-libp2p", "External API <= Closed({}, {:?})", source, set_id);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: source,
set_id,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
},
state @ PeerState::Disabled { .. } |
state @ PeerState::DisabledPendingEnable { .. } => {
*entry.into_mut() = state;
return;
},
state => {
error!(target: "sub-libp2p",
"Unexpected state in the custom protos handler: {:?}",
state);
return
}
}
}
NotifsHandlerOut::CloseResult { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index);
debug!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseResult({:?})",
source, connection, set_id);
match self.peers.get_mut(&(source.clone(), set_id)) {
Some(PeerState::DisabledPendingEnable { connections, .. }) |
Some(PeerState::Disabled { connections, .. }) |
Some(PeerState::Enabled { connections, .. }) => {
if let Some((_, connec_state)) = connections
.iter_mut()
.find(|(c, s)| *c == connection && matches!(s, ConnectionState::Closing))
{
*connec_state = ConnectionState::Closed;
} else {
error!(target: "sub-libp2p",
"CloseResult: State mismatch in the custom protos handler");
debug_assert!(false);
}
},
state => {
error!(target: "sub-libp2p",
"CloseResult: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
}
}
}
NotifsHandlerOut::OpenResultOk { protocol_index, received_handshake, notifications_sink, .. } => {
let set_id = sc_peerset::SetId::from(protocol_index);
debug!(target: "sub-libp2p",
"Handler({}, {:?}) => OpenResultOk({:?})",
source, connection, set_id);
match self.peers.get_mut(&(source.clone(), set_id)) {
Some(PeerState::Enabled { connections, .. }) => {
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
let any_open = connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_)));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::Opening))
{
if !any_open {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen {
peer_id: source,
set_id,
received_handshake,
notifications_sink: notifications_sink.clone(),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
*connec_state = ConnectionState::Open(notifications_sink);
} else if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::OpeningThenClosing))
{
*connec_state = ConnectionState::Closing;
} else {
debug_assert!(false);
error!(target: "sub-libp2p",
"OpenResultOk State mismatch in the custom protos handler");
}
},
Some(PeerState::DisabledPendingEnable { connections, .. }) |
Some(PeerState::Disabled { connections, .. }) => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::OpeningThenClosing))
{
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultOk State mismatch in the custom protos handler");
debug_assert!(false);
}
}
state => {
error!(target: "sub-libp2p",
"OpenResultOk: Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
return
}
}
}
NotifsHandlerOut::OpenResultErr { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index);
debug!(target: "sub-libp2p",
"Handler({:?}, {:?}) => OpenResultErr({:?})",
source, connection, set_id);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((source.clone(), set_id)) {
entry
} else {
error!(target: "sub-libp2p", "OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
debug_assert!(false);
return
};
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut connections } => {
debug_assert!(connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_))));
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::Opening))
{
*connec_state = ConnectionState::Closed;
} else if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::OpeningThenClosing))
{
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
}
if !connections.iter().any(|(_, s)|
matches!(s, ConnectionState::Opening | ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source);
self.peerset.dropped(set_id, source.clone(), sc_peerset::DropReason::Refused);
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until: None
};
} else {
*entry.into_mut() = PeerState::Enabled { connections };
}
},
PeerState::Disabled { mut connections, backoff_until } => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::OpeningThenClosing))
{
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
}
*entry.into_mut() = PeerState::Disabled { connections, backoff_until };
},
PeerState::DisabledPendingEnable { mut connections, timer, timer_deadline } => {
if let Some((_, connec_state)) = connections.iter_mut().find(|(c, s)|
*c == connection && matches!(s, ConnectionState::OpeningThenClosing))
{
*connec_state = ConnectionState::Closing;
} else {
error!(target: "sub-libp2p",
"OpenResultErr: State mismatch in the custom protos handler");
debug_assert!(false);
}
*entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer,
timer_deadline,
};
},
state => {
error!(target: "sub-libp2p",
"Unexpected state in the custom protos handler: {:?}",
state);
debug_assert!(false);
}
};
}
NotifsHandlerOut::CustomMessage { message } => {
if self.is_open(&source, sc_peerset::SetId::from(0)) {
trace!(target: "sub-libp2p", "Handler({:?}) => Message", source);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close message. Dropping message.",
source,
);
}
}
NotifsHandlerOut::Notification { protocol_index, message } => {
let set_id = sc_peerset::SetId::from(protocol_index);
if self.is_open(&source, set_id) {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Notification({}, {:?}, {} bytes)",
connection,
source,
set_id,
message.len()
);
trace!(target: "sub-libp2p", "External API <= Message({}, {:?})",
source, set_id);
let event = GenericProtoOut::Notification {
peer_id: source,
set_id,
message,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close notification({}, {:?}, {} bytes)",
connection,
source,
set_id,
message.len()
);
}
}
}
}
fn poll(
&mut self,
cx: &mut Context,
_params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
NotifsHandlerIn,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
loop {
match futures::Stream::poll_next(Pin::new(&mut self.peerset), cx) {
Poll::Ready(Some(sc_peerset::Message::Accept(index))) => {
self.peerset_report_accept(index);
}
Poll::Ready(Some(sc_peerset::Message::Reject(index))) => {
self.peerset_report_reject(index);
}
Poll::Ready(Some(sc_peerset::Message::Connect { peer_id, set_id, .. })) => {
self.peerset_report_connect(peer_id, set_id);
}
Poll::Ready(Some(sc_peerset::Message::Drop { peer_id, set_id, .. })) => {
self.peerset_report_disconnect(peer_id, set_id);
}
Poll::Ready(None) => {
error!(target: "sub-libp2p", "Peerset receiver stream has returned None");
break;
}
Poll::Pending => break,
}
}
while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
Pin::new(&mut self.delays).poll_next(cx) {
let peer_state = match self.peers.get_mut(&(peer_id.clone(), set_id)) {
Some(s) => s,
None => continue,
};
match peer_state {
PeerState::Backoff { timer, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Libp2p <= Clean up ban of {:?} from the state", peer_id);
self.peers.remove(&(peer_id, set_id));
}
PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id);
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected
});
*peer_state = PeerState::Requested;
}
PeerState::DisabledPendingEnable { connections, timer, timer_deadline }
if *timer == delay_id =>
{
if let Some((connec_id, connec_state)) = connections.iter_mut()
.find(|(_, s)| matches!(s, ConnectionState::Closed))
{
debug!(target: "sub-libp2p", "Handler({}, {:?}) <= Open({:?}) (ban expired)",
peer_id, *connec_id, set_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
*peer_state = PeerState::Enabled {
connections: mem::replace(connections, Default::default()),
};
} else {
*timer_deadline = Instant::now() + Duration::from_secs(5);
let delay = futures_timer::Delay::new(Duration::from_secs(5));
let timer = *timer;
self.delays.push(async move {
delay.await;
(timer, peer_id, set_id)
}.boxed());
}
}
_ => {},
}
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}