use crate::{
    ConnectedPoint,
    PeerId,
    connection::{
        self,
        Connected,
        Connection,
        ConnectionId,
        ConnectionLimit,
        ConnectionError,
        ConnectionHandler,
        IncomingInfo,
        IntoConnectionHandler,
        OutgoingInfo,
        Substream,
        PendingConnectionError,
        manager::{self, Manager, ManagerConfig},
    },
    muxing::StreamMuxer,
};
use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
    local_id: PeerId,
    
    counters: ConnectionCounters,
    
    
    
    
    manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
    
    
    established: FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
    
    pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
    
    
    
    
    disconnected: Vec<Disconnected>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.debug_struct("Pool")
            .field("counters", &self.counters)
            .finish()
    }
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
    
    ConnectionEstablished {
        connection: EstablishedConnection<'a, TInEvent>,
        num_established: NonZeroU32,
    },
    
    
    
    
    
    
    
    
    
    
    
    ConnectionClosed {
        id: ConnectionId,
        
        connected: Connected,
        
        
        error: Option<ConnectionError<THandlerErr>>,
        
        pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
        
        num_established: u32,
    },
    
    PendingConnectionError {
        
        id: ConnectionId,
        
        endpoint: ConnectedPoint,
        
        error: PendingConnectionError<TTransErr>,
        
        
        handler: Option<THandler>,
        
        peer: Option<PeerId>,
        
        pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
    },
    
    ConnectionEvent {
        
        connection: EstablishedConnection<'a, TInEvent>,
        
        event: TOutEvent,
    },
    
    AddressChange {
        
        connection: EstablishedConnection<'a, TInEvent>,
        
        new_endpoint: ConnectedPoint,
        
        old_endpoint: ConnectedPoint,
    },
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
    TOutEvent: fmt::Debug,
    TTransErr: fmt::Debug,
    THandlerErr: fmt::Debug,
    TInEvent: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        match *self {
            PoolEvent::ConnectionEstablished { ref connection, .. } => {
                f.debug_tuple("PoolEvent::ConnectionEstablished")
                    .field(connection)
                    .finish()
            },
            PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => {
                f.debug_struct("PoolEvent::ConnectionClosed")
                    .field("id", id)
                    .field("connected", connected)
                    .field("error", error)
                    .finish()
            },
            PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
                f.debug_struct("PoolEvent::PendingConnectionError")
                    .field("id", id)
                    .field("error", error)
                    .finish()
            },
            PoolEvent::ConnectionEvent { ref connection, ref event } => {
                f.debug_struct("PoolEvent::ConnectionEvent")
                    .field("peer", connection.peer_id())
                    .field("event", event)
                    .finish()
            },
            PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
                f.debug_struct("PoolEvent::AddressChange")
                    .field("peer", connection.peer_id())
                    .field("new_endpoint", new_endpoint)
                    .field("old_endpoint", old_endpoint)
                    .finish()
            },
        }
    }
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
    Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
    
    pub fn new(
        local_id: PeerId,
        manager_config: ManagerConfig,
        limits: ConnectionLimits
    ) -> Self {
        Pool {
            local_id,
            counters: ConnectionCounters::new(limits),
            manager: Manager::new(manager_config),
            established: Default::default(),
            pending: Default::default(),
            disconnected: Vec::new(),
        }
    }
    
    pub fn counters(&self) -> &ConnectionCounters {
        &self.counters
    }
    
    
    
    
    
    pub fn add_incoming<TFut, TMuxer>(
        &mut self,
        future: TFut,
        handler: THandler,
        info: IncomingInfo<'_>,
    ) -> Result<ConnectionId, ConnectionLimit>
    where
        TFut: Future<
            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
        > + Send + 'static,
        THandler: IntoConnectionHandler + Send + 'static,
        THandler::Handler: ConnectionHandler<
            Substream = Substream<TMuxer>,
            InEvent = TInEvent,
            OutEvent = TOutEvent,
            Error = THandlerErr
        > + Send + 'static,
        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
        TTransErr: error::Error + Send + 'static,
        THandlerErr: error::Error + Send + 'static,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        TMuxer: StreamMuxer + Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send + 'static,
    {
        self.counters.check_max_pending_incoming()?;
        let endpoint = info.to_connected_point();
        Ok(self.add_pending(future, handler, endpoint, None))
    }
    
    
    
    
    
    pub fn add_outgoing<TFut, TMuxer>(
        &mut self,
        future: TFut,
        handler: THandler,
        info: OutgoingInfo<'_>,
    ) -> Result<ConnectionId, ConnectionLimit>
    where
        TFut: Future<
            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
        > + Send + 'static,
        THandler: IntoConnectionHandler + Send + 'static,
        THandler::Handler: ConnectionHandler<
            Substream = Substream<TMuxer>,
            InEvent = TInEvent,
            OutEvent = TOutEvent,
            Error = THandlerErr
        > + Send + 'static,
        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
        TTransErr: error::Error + Send + 'static,
        THandlerErr: error::Error + Send + 'static,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        TMuxer: StreamMuxer + Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send + 'static,
    {
        self.counters.check_max_pending_outgoing()?;
        let endpoint = info.to_connected_point();
        Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
    }
    
    
    fn add_pending<TFut, TMuxer>(
        &mut self,
        future: TFut,
        handler: THandler,
        endpoint: ConnectedPoint,
        peer: Option<PeerId>,
    ) -> ConnectionId
    where
        TFut: Future<
            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
        > + Send + 'static,
        THandler: IntoConnectionHandler + Send + 'static,
        THandler::Handler: ConnectionHandler<
            Substream = Substream<TMuxer>,
            InEvent = TInEvent,
            OutEvent = TOutEvent,
            Error = THandlerErr
        > + Send + 'static,
        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
        TTransErr: error::Error + Send + 'static,
        THandlerErr: error::Error + Send + 'static,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        TMuxer: StreamMuxer + Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send + 'static,
    {
        
        
        
        
        let future = future.and_then({
            let endpoint = endpoint.clone();
            let expected_peer = peer.clone();
            let local_id = self.local_id.clone();
            move |(peer_id, muxer)| {
                if let Some(peer) = expected_peer {
                    if peer != peer_id {
                        return future::err(PendingConnectionError::InvalidPeerId)
                    }
                }
                if local_id == peer_id {
                    return future::err(PendingConnectionError::InvalidPeerId)
                }
                let connected = Connected { peer_id, endpoint };
                future::ready(Ok((connected, muxer)))
            }
        });
        let id = self.manager.add_pending(future, handler);
        self.counters.inc_pending(&endpoint);
        self.pending.insert(id, (endpoint, peer));
        id
    }
    
    
    
    
    
    pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected)
        -> Result<ConnectionId, ConnectionLimit>
    where
        THandler: IntoConnectionHandler + Send + 'static,
        THandler::Handler: ConnectionHandler<
            Substream = connection::Substream<TMuxer>,
            InEvent = TInEvent,
            OutEvent = TOutEvent,
            Error = THandlerErr
        > + Send + 'static,
        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
        TTransErr: error::Error + Send + 'static,
        THandlerErr: error::Error + Send + 'static,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        TMuxer: StreamMuxer + Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send + 'static,
    {
        self.counters.check_max_established(&i.endpoint)?;
        self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
        let id = self.manager.add(c, i.clone());
        self.counters.inc_established(&i.endpoint);
        self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
        Ok(id)
    }
    
    
    
    pub fn get(&mut self, id: ConnectionId)
        -> Option<PoolConnection<'_, TInEvent>>
    {
        match self.manager.entry(id) {
            Some(manager::Entry::Established(entry)) =>
                Some(PoolConnection::Established(EstablishedConnection {
                    entry
                })),
            Some(manager::Entry::Pending(entry)) =>
                Some(PoolConnection::Pending(PendingConnection {
                    entry,
                    pending: &mut self.pending,
                    counters: &mut self.counters,
                })),
            None => None
        }
    }
    
    pub fn get_established(&mut self, id: ConnectionId)
        -> Option<EstablishedConnection<'_, TInEvent>>
    {
        match self.get(id) {
            Some(PoolConnection::Established(c)) => Some(c),
            _ => None
        }
    }
    
    pub fn get_outgoing(&mut self, id: ConnectionId)
        -> Option<PendingConnection<'_, TInEvent>>
    {
        match self.pending.get(&id) {
            Some((ConnectedPoint::Dialer { .. }, _peer)) =>
                match self.manager.entry(id) {
                    Some(manager::Entry::Pending(entry)) =>
                        Some(PendingConnection {
                            entry,
                            pending: &mut self.pending,
                            counters: &mut self.counters,
                        }),
                    _ => unreachable!("by consistency of `self.pending` with `self.manager`")
                }
            _ => None
        }
    }
    
    
    
    pub fn is_connected(&self, id: &PeerId) -> bool {
        self.established.contains_key(id)
    }
    
    
    pub fn num_peers(&self) -> usize {
        self.established.len()
    }
    
    
    
    
    
    
    
    
    
    pub fn disconnect(&mut self, peer: &PeerId) {
        if let Some(conns) = self.established.get(peer) {
            
            let mut num_established = 0;
            for (&id, endpoint) in conns.iter() {
                if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
                    let connected = e.remove();
                    self.disconnected.push(Disconnected {
                        id, connected, num_established
                    });
                    num_established += 1;
                }
                self.counters.dec_established(endpoint);
            }
        }
        self.established.remove(peer);
        let mut aborted = Vec::new();
        for (&id, (_endpoint, peer2)) in &self.pending {
            if Some(peer) == peer2.as_ref() {
                if let Some(manager::Entry::Pending(e)) = self.manager.entry(id) {
                    e.abort();
                    aborted.push(id);
                }
            }
        }
        for id in aborted {
            if let Some((endpoint, _)) = self.pending.remove(&id) {
                self.counters.dec_pending(&endpoint);
            }
        }
    }
    
    pub fn num_peer_established(&self, peer: &PeerId) -> u32 {
        num_peer_established(&self.established, peer)
    }
    
    pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
        -> EstablishedConnectionIter<'a,
            impl Iterator<Item = ConnectionId>,
            TInEvent,
            TOutEvent,
            THandler,
            TTransErr,
            THandlerErr>
    {
        let ids = self.iter_peer_established_info(peer)
            .map(|(id, _endpoint)| *id)
            .collect::<SmallVec<[ConnectionId; 10]>>()
            .into_iter();
        EstablishedConnectionIter { pool: self, ids }
    }
    
    pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
        self.iter_pending_info()
            .filter_map(|(_, ref endpoint, _)| {
                match endpoint {
                    ConnectedPoint::Listener { local_addr, send_back_addr } => {
                        Some(IncomingInfo { local_addr, send_back_addr })
                    },
                    ConnectedPoint::Dialer { .. } => None,
                }
            })
    }
    
    pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_>> {
        self.iter_pending_info()
            .filter_map(|(_, ref endpoint, ref peer_id)| {
                match endpoint {
                    ConnectedPoint::Listener { .. } => None,
                    ConnectedPoint::Dialer { address } =>
                        Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
                }
            })
    }
    
    
    pub fn iter_peer_established_info(&self, peer: &PeerId)
        -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
    {
        match self.established.get(peer) {
            Some(conns) => Either::Left(conns.iter()),
            None => Either::Right(std::iter::empty())
        }
    }
    
    
    pub fn iter_pending_info(&self)
        -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<PeerId>)> + '_
    {
        self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
    }
    
    
    pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
        self.established.keys()
    }
    
    
    
    
    pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
        PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
    >     {
        
        
        
        
        
        
        
        if let Some(Disconnected {
            id, connected, num_established
        }) = self.disconnected.pop() {
            return Poll::Ready(PoolEvent::ConnectionClosed {
                id,
                connected,
                num_established,
                error: None,
                pool: self,
            })
        }
        
        loop {
            let item = match self.manager.poll(cx) {
                Poll::Ready(item) => item,
                Poll::Pending => return Poll::Pending,
            };
            match item {
                manager::Event::PendingConnectionError { id, error, handler } => {
                    if let Some((endpoint, peer)) = self.pending.remove(&id) {
                        self.counters.dec_pending(&endpoint);
                        return Poll::Ready(PoolEvent::PendingConnectionError {
                            id,
                            endpoint,
                            error,
                            handler: Some(handler),
                            peer,
                            pool: self
                        })
                    }
                },
                manager::Event::ConnectionClosed { id, connected, error } => {
                    let num_established =
                        if let Some(conns) = self.established.get_mut(&connected.peer_id) {
                            if let Some(endpoint) = conns.remove(&id) {
                                self.counters.dec_established(&endpoint);
                            }
                            u32::try_from(conns.len()).unwrap()
                        } else {
                            0
                        };
                    if num_established == 0 {
                        self.established.remove(&connected.peer_id);
                    }
                    return Poll::Ready(PoolEvent::ConnectionClosed {
                        id, connected, error, num_established, pool: self
                    })
                }
                manager::Event::ConnectionEstablished { entry } => {
                    let id = entry.id();
                    if let Some((endpoint, peer)) = self.pending.remove(&id) {
                        self.counters.dec_pending(&endpoint);
                        
                        if let Err(e) = self.counters.check_max_established(&endpoint) {
                            let connected = entry.remove();
                            return Poll::Ready(PoolEvent::PendingConnectionError {
                                id,
                                endpoint: connected.endpoint,
                                error: PendingConnectionError::ConnectionLimit(e),
                                handler: None,
                                peer,
                                pool: self
                            })
                        }
                        
                        let current = num_peer_established(&self.established, &entry.connected().peer_id);
                        if let Err(e) = self.counters.check_max_established_per_peer(current) {
                            let connected = entry.remove();
                            return Poll::Ready(PoolEvent::PendingConnectionError {
                                id,
                                endpoint: connected.endpoint,
                                error: PendingConnectionError::ConnectionLimit(e),
                                handler: None,
                                peer,
                                pool: self
                            })
                        }
                        
                        if cfg!(debug_assertions) {
                            if self.local_id == entry.connected().peer_id {
                                panic!("Unexpected local peer ID for remote.");
                            }
                            if let Some(peer) = peer {
                                if peer != entry.connected().peer_id {
                                    panic!("Unexpected peer ID mismatch.");
                                }
                            }
                        }
                        
                        let peer = entry.connected().peer_id.clone();
                        let conns = self.established.entry(peer).or_default();
                        let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
                            .expect("n + 1 is always non-zero; qed");
                        self.counters.inc_established(&endpoint);
                        conns.insert(id, endpoint);
                        match self.get(id) {
                            Some(PoolConnection::Established(connection)) =>
                                return Poll::Ready(PoolEvent::ConnectionEstablished {
                                    connection, num_established
                                }),
                            _ => unreachable!("since `entry` is an `EstablishedEntry`.")
                        }
                    }
                },
                manager::Event::ConnectionEvent { entry, event } => {
                    let id = entry.id();
                    match self.get(id) {
                        Some(PoolConnection::Established(connection)) =>
                            return Poll::Ready(PoolEvent::ConnectionEvent {
                                connection,
                                event,
                            }),
                        _ => unreachable!("since `entry` is an `EstablishedEntry`.")
                    }
                },
                manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
                    let id = entry.id();
                    match self.established.get_mut(&entry.connected().peer_id) {
                        Some(list) => *list.get_mut(&id)
                            .expect("state inconsistency: entry is `EstablishedEntry` but absent \
                                from `established`") = new_endpoint.clone(),
                        None => unreachable!("since `entry` is an `EstablishedEntry`.")
                    };
                    match self.get(id) {
                        Some(PoolConnection::Established(connection)) =>
                            return Poll::Ready(PoolEvent::AddressChange {
                                connection,
                                new_endpoint,
                                old_endpoint,
                            }),
                        _ => unreachable!("since `entry` is an `EstablishedEntry`.")
                    }
                },
            }
        }
    }
}
pub enum PoolConnection<'a, TInEvent> {
    Pending(PendingConnection<'a, TInEvent>),
    Established(EstablishedConnection<'a, TInEvent>),
}
pub struct PendingConnection<'a, TInEvent> {
    entry: manager::PendingEntry<'a, TInEvent>,
    pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
    counters: &'a mut ConnectionCounters,
}
impl<TInEvent>
    PendingConnection<'_, TInEvent>
{
    
    pub fn id(&self) -> ConnectionId {
        self.entry.id()
    }
    
    pub fn peer_id(&self) -> &Option<PeerId> {
        &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
    }
    
    pub fn endpoint(&self) -> &ConnectedPoint {
        &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
    }
    
    pub fn abort(self) {
        let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0;
        self.counters.dec_pending(&endpoint);
        self.entry.abort();
    }
}
pub struct EstablishedConnection<'a, TInEvent> {
    entry: manager::EstablishedEntry<'a, TInEvent>,
}
impl<TInEvent> fmt::Debug
for EstablishedConnection<'_, TInEvent>
where
    TInEvent: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.debug_struct("EstablishedConnection")
            .field("entry", &self.entry)
            .finish()
    }
}
impl<TInEvent> EstablishedConnection<'_, TInEvent> {
    pub fn connected(&self) -> &Connected {
        self.entry.connected()
    }
    
    pub fn endpoint(&self) -> &ConnectedPoint {
        &self.entry.connected().endpoint
    }
    
    pub fn peer_id(&self) -> &PeerId {
        &self.entry.connected().peer_id
    }
    
    pub fn id(&self) -> ConnectionId {
        self.entry.id()
    }
    
    
    
    
    
    
    
    
    
    
    pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
        self.entry.notify_handler(event)
    }
    
    
    
    
    
    
    pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
        self.entry.poll_ready_notify_handler(cx)
    }
    
    
    
    pub fn start_close(self) {
        self.entry.start_close()
    }
}
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
    pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
    ids: I
}
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
    EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
    I: Iterator<Item = ConnectionId>
{
    
    pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
    {
        while let Some(id) = self.ids.next() {
            if self.pool.manager.is_established(&id) { 
                match self.pool.manager.entry(id) {
                    Some(manager::Entry::Established(entry)) => {
                        return Some(EstablishedConnection { entry })
                    }
                    _ => panic!("Established entry not found in manager.") 
                }
            }
        }
        None
    }
    
    pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
        self.ids
    }
    
    pub fn into_first<'b>(mut self)
        -> Option<EstablishedConnection<'b, TInEvent>>
    where 'a: 'b
    {
        while let Some(id) = self.ids.next() {
            if self.pool.manager.is_established(&id) { 
                match self.pool.manager.entry(id) {
                    Some(manager::Entry::Established(entry)) => {
                        return Some(EstablishedConnection { entry })
                    }
                    _ => panic!("Established entry not found in manager.") 
                }
            }
        }
        None
    }
}
#[derive(Debug, Clone)]
pub struct ConnectionCounters {
    
    limits: ConnectionLimits,
    
    pending_incoming: u32,
    
    pending_outgoing: u32,
    
    established_incoming: u32,
    
    established_outgoing: u32,
}
impl ConnectionCounters {
    fn new(limits: ConnectionLimits) -> Self {
        Self {
            limits,
            pending_incoming: 0,
            pending_outgoing: 0,
            established_incoming: 0,
            established_outgoing: 0,
        }
    }
    
    pub fn limits(&self) -> &ConnectionLimits {
        &self.limits
    }
    
    pub fn num_connections(&self) -> u32 {
        self.num_pending() + self.num_established()
    }
    
    pub fn num_pending(&self) -> u32 {
        self.pending_incoming + self.pending_outgoing
    }
    
    pub fn num_pending_incoming(&self) -> u32 {
        self.pending_incoming
    }
    
    pub fn num_pending_outgoing(&self) -> u32 {
        self.pending_outgoing
    }
    
    pub fn num_established_incoming(&self) -> u32 {
        self.established_incoming
    }
    
    pub fn num_established_outgoing(&self) -> u32 {
        self.established_outgoing
    }
    
    pub fn num_established(&self) -> u32 {
        self.established_outgoing + self.established_incoming
    }
    fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
        match endpoint {
            ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
            ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
        }
    }
    fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
        match endpoint {
            ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
            ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
        }
    }
    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
        match endpoint {
            ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; }
            ConnectedPoint::Listener { .. } => { self.established_incoming += 1; }
        }
    }
    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
        match endpoint {
            ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; }
            ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; }
        }
    }
    fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
        Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
    }
    fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
        Self::check(self.pending_incoming, self.limits.max_pending_incoming)
    }
    fn check_max_established(&self, endpoint: &ConnectedPoint)
        -> Result<(), ConnectionLimit>
    {
        match endpoint {
            ConnectedPoint::Dialer { .. } =>
                Self::check(self.established_outgoing, self.limits.max_established_outgoing),
            ConnectedPoint::Listener { .. } => {
                Self::check(self.established_incoming, self.limits.max_established_incoming)
            }
        }
    }
    fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
        Self::check(current, self.limits.max_established_per_peer)
    }
    fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
        if let Some(limit) = limit {
            if current >= limit {
                return Err(ConnectionLimit { limit, current })
            }
        }
        Ok(())
    }
}
fn num_peer_established(
    established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
    peer: &PeerId
) -> u32 {
    established.get(peer).map_or(0, |conns|
        u32::try_from(conns.len())
            .expect("Unexpectedly large number of connections for a peer."))
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionLimits {
    max_pending_incoming: Option<u32>,
    max_pending_outgoing: Option<u32>,
    max_established_incoming: Option<u32>,
    max_established_outgoing: Option<u32>,
    max_established_per_peer: Option<u32>,
}
impl ConnectionLimits {
    
    pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
        self.max_pending_incoming = limit;
        self
    }
    
    pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
        self.max_pending_outgoing = limit;
        self
    }
    
    pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
        self.max_established_incoming = limit;
        self
    }
    
    pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
        self.max_established_outgoing = limit;
        self
    }
    
    
    pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
        self.max_established_per_peer = limit;
        self
    }
}
struct Disconnected {
    
    id: ConnectionId,
    
    connected: Connected,
    
    
    num_established: u32,
}