use crate::config::ValidationMode;
use crate::error::{GossipsubHandlerError, ValidationError};
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage};
use futures::prelude::*;
use futures::StreamExt;
use asynchronous_codec::Framed;
use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError};
use libp2p_swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use log::{error, trace, warn};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use wasm_timer::Instant;
const INITIAL_KEEP_ALIVE: u64 = 30;
#[derive(Debug)]
pub enum HandlerEvent {
Message {
rpc: GossipsubRpc,
invalid_messages: Vec<(RawGossipsubMessage, ValidationError)>,
},
PeerKind(PeerKind),
}
const MAX_SUBSTREAM_CREATION: usize = 5;
pub struct GossipsubHandler {
listen_protocol: SubstreamProtocol<ProtocolConfig, ()>,
outbound_substream: Option<OutboundSubstreamState>,
inbound_substream: Option<InboundSubstreamState>,
send_queue: SmallVec<[crate::rpc_proto::Rpc; 16]>,
outbound_substream_establishing: bool,
outbound_substreams_created: usize,
inbound_substreams_created: usize,
peer_kind: Option<PeerKind>,
peer_kind_sent: bool,
protocol_unsupported: bool,
upgrade_errors: VecDeque<ProtocolsHandlerUpgrErr<GossipsubHandlerError>>,
keep_alive: KeepAlive,
}
enum InboundSubstreamState {
WaitingInput(Framed<NegotiatedSubstream, GossipsubCodec>),
Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
Poisoned,
}
enum OutboundSubstreamState {
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
PendingSend(
Framed<NegotiatedSubstream, GossipsubCodec>,
crate::rpc_proto::Rpc,
),
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
_Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
Poisoned,
}
impl GossipsubHandler {
pub fn new(
protocol_id_prefix: std::borrow::Cow<'static, str>,
max_transmit_size: usize,
validation_mode: ValidationMode,
support_floodsub: bool,
) -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(
ProtocolConfig::new(
protocol_id_prefix,
max_transmit_size,
validation_mode,
support_floodsub,
),
(),
),
inbound_substream: None,
outbound_substream: None,
outbound_substream_establishing: false,
outbound_substreams_created: 0,
inbound_substreams_created: 0,
send_queue: SmallVec::new(),
peer_kind: None,
peer_kind_sent: false,
protocol_unsupported: false,
upgrade_errors: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
}
}
}
impl ProtocolsHandler for GossipsubHandler {
type InEvent = crate::rpc_proto::Rpc;
type OutEvent = HandlerEvent;
type Error = GossipsubHandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
type OutboundOpenInfo = Self::InEvent;
type OutboundProtocol = ProtocolConfig;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.listen_protocol.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
(substream, peer_kind): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
if self.protocol_unsupported {
return;
}
self.inbound_substreams_created += 1;
if self.peer_kind.is_none() {
self.peer_kind = Some(peer_kind);
}
trace!("New inbound substream request");
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
}
fn inject_fully_negotiated_outbound(
&mut self,
(substream, peer_kind): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
message: Self::OutboundOpenInfo,
) {
if self.protocol_unsupported {
return;
}
self.outbound_substream_establishing = false;
self.outbound_substreams_created += 1;
if self.peer_kind.is_none() {
self.peer_kind = Some(peer_kind);
}
if self.outbound_substream.is_some() {
warn!("Established an outbound substream with one already available");
self.send_queue.push(message);
} else {
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message));
}
}
fn inject_event(&mut self, message: crate::rpc_proto::Rpc) {
if !self.protocol_unsupported {
self.send_queue.push(message);
}
}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
e: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
self.outbound_substream_establishing = false;
warn!("Dial upgrade error {:?}", e);
self.upgrade_errors.push_back(e);
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
Some(GossipsubHandlerError::NegotiationTimeout)
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
match negotiation_error {
NegotiationError::Failed => {
self.protocol_unsupported = true;
if !self.peer_kind_sent {
self.peer_kind_sent = true;
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
} else {
None
}
}
NegotiationError::ProtocolError(e) => {
Some(GossipsubHandlerError::NegotiationProtocolError(e))
}
}
}
};
if let Some(error) = reported_error {
return Poll::Ready(ProtocolsHandlerEvent::Close(error));
}
}
if !self.peer_kind_sent {
if let Some(peer_kind) = self.peer_kind.as_ref() {
self.peer_kind_sent = true;
return Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerEvent::PeerKind(
peer_kind.clone(),
)));
}
}
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
return Poll::Ready(ProtocolsHandlerEvent::Close(
GossipsubHandlerError::MaxInboundSubstreams,
));
}
if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ProtocolsHandlerEvent::Close(
GossipsubHandlerError::MaxOutboundSubstreams,
));
}
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
self.outbound_substream_establishing = true;
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: self.listen_protocol.clone().map_info(|()| message),
});
}
loop {
match std::mem::replace(
&mut self.inbound_substream,
Some(InboundSubstreamState::Poisoned),
) {
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(message))) => {
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ProtocolsHandlerEvent::Custom(message));
}
Poll::Ready(Some(Err(error))) => {
match error {
GossipsubHandlerError::MaxTransmissionSize => {
warn!("Message exceeded the maximum transmission size");
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
}
_ => {
warn!("Inbound stream error: {}", error);
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
}
}
Poll::Ready(None) => {
warn!("Peer closed their outbound stream");
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
break;
}
}
}
Some(InboundSubstreamState::Closing(mut substream)) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(res) => {
if let Err(e) = res {
warn!("Inbound substream error while closing: {:?}", e);
}
self.inbound_substream = None;
if self.outbound_substream.is_none() {
self.keep_alive = KeepAlive::No;
}
break;
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
break;
}
}
}
None => {
self.inbound_substream = None;
break;
}
Some(InboundSubstreamState::Poisoned) => {
unreachable!("Error occurred during inbound stream processing")
}
}
}
loop {
match std::mem::replace(
&mut self.outbound_substream,
Some(OutboundSubstreamState::Poisoned),
) {
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if !self.send_queue.is_empty() {
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
} else {
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
}
}
Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
match Sink::start_send(Pin::new(&mut substream), message) {
Ok(()) => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream))
}
Err(GossipsubHandlerError::MaxTransmissionSize) => {
error!("Message exceeded the maximum transmission size and was not sent.");
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
}
Err(e) => {
error!("Error sending message: {}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
}
}
Poll::Ready(Err(e)) => {
error!("Outbound substream error while sending output: {:?}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
break;
}
}
}
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => return Poll::Ready(ProtocolsHandlerEvent::Close(e)),
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream));
break;
}
}
}
Some(OutboundSubstreamState::_Closing(mut substream)) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.outbound_substream = None;
if self.inbound_substream.is_none() {
self.keep_alive = KeepAlive::No;
}
break;
}
Poll::Ready(Err(e)) => {
warn!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)
.into(),
));
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::_Closing(substream));
break;
}
}
}
None => {
self.outbound_substream = None;
break;
}
Some(OutboundSubstreamState::Poisoned) => {
unreachable!("Error occurred during outbound stream processing")
}
}
}
Poll::Pending
}
}