mod event;
pub mod peer;
pub use crate::connection::{ConnectionLimits, ConnectionCounters};
pub use event::{NetworkEvent, IncomingConnection};
pub use peer::Peer;
use crate::{
    ConnectedPoint,
    Executor,
    Multiaddr,
    PeerId,
    connection::{
        ConnectionId,
        ConnectionLimit,
        ConnectionHandler,
        IntoConnectionHandler,
        IncomingInfo,
        OutgoingInfo,
        ListenersEvent,
        ListenerId,
        ListenersStream,
        PendingConnectionError,
        Substream,
        manager::ManagerConfig,
        pool::{Pool, PoolEvent},
    },
    muxing::StreamMuxer,
    transport::{Transport, TransportError},
};
use fnv::{FnvHashMap};
use futures::{prelude::*, future};
use smallvec::SmallVec;
use std::{
    collections::hash_map,
    convert::TryFrom as _,
    error,
    fmt,
    num::NonZeroUsize,
    pin::Pin,
    task::{Context, Poll},
};
pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
where
    TTrans: Transport,
    THandler: IntoConnectionHandler,
{
    
    local_peer_id: PeerId,
    
    listeners: ListenersStream<TTrans>,
    
    pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
        <THandler::Handler as ConnectionHandler>::Error>,
    
    
    
    
    
    
    
    
    
    
    
    
    dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
}
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
    Network<TTrans, TInEvent, TOutEvent, THandler>
where
    TTrans: fmt::Debug + Transport,
    THandler: fmt::Debug + ConnectionHandler,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.debug_struct("ReachAttempts")
            .field("local_peer_id", &self.local_peer_id)
            .field("listeners", &self.listeners)
            .field("peers", &self.pool)
            .field("dialing", &self.dialing)
            .finish()
    }
}
impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
    Network<TTrans, TInEvent, TOutEvent, THandler>
where
    TTrans: Transport,
    THandler: IntoConnectionHandler,
{
}
impl<TTrans, TInEvent, TOutEvent, THandler>
    Network<TTrans, TInEvent, TOutEvent, THandler>
where
    TTrans: Transport,
    THandler: IntoConnectionHandler,
{
    fn disconnect(&mut self, peer: &PeerId) {
        self.pool.disconnect(peer);
        self.dialing.remove(peer);
    }
}
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
    Network<TTrans, TInEvent, TOutEvent, THandler>
where
    TTrans: Transport + Clone,
    TMuxer: StreamMuxer,
    THandler: IntoConnectionHandler + Send + 'static,
    THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
    <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
    <THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
{
    
    pub fn new(
        transport: TTrans,
        local_peer_id: PeerId,
        config: NetworkConfig,
    ) -> Self {
        let pool_local_id = local_peer_id.clone();
        Network {
            local_peer_id,
            listeners: ListenersStream::new(transport),
            pool: Pool::new(pool_local_id, config.manager_config, config.limits),
            dialing: Default::default(),
        }
    }
    
    pub fn transport(&self) -> &TTrans {
        self.listeners.transport()
    }
    
    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>> {
        self.listeners.listen_on(addr)
    }
    
    
    
    pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
        self.listeners.remove_listener(id)
    }
    
    pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
        self.listeners.listen_addrs()
    }
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
        -> impl Iterator<Item = Multiaddr> + 'a
    where
        TMuxer: 'a,
        THandler: 'a,
    {
        let transport = self.listeners.transport();
        let mut addrs: Vec<_> = self.listen_addrs()
            .filter_map(move |server| transport.address_translation(server, observed_addr))
            .collect();
        
        addrs.sort_unstable();
        addrs.dedup();
        addrs.into_iter()
    }
    
    pub fn local_peer_id(&self) -> &PeerId {
        &self.local_peer_id
    }
    
    
    
    
    
    pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
        -> Result<ConnectionId, ConnectionLimit>
    where
        TTrans: Transport<Output = (PeerId, TMuxer)>,
        TTrans::Error: Send + 'static,
        TTrans::Dial: Send + 'static,
        TMuxer: Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
    {
        let info = OutgoingInfo { address, peer_id: None };
        match self.transport().clone().dial(address.clone()) {
            Ok(f) => {
                let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
                self.pool.add_outgoing(f, handler, info)
            }
            Err(err) => {
                let f = future::err(PendingConnectionError::Transport(err));
                self.pool.add_outgoing(f, handler, info)
            }
        }
    }
    
    pub fn info(&self) -> NetworkInfo {
        let num_peers = self.pool.num_peers();
        let connection_counters = self.pool.counters().clone();
        NetworkInfo {
            num_peers,
            connection_counters,
        }
    }
    
    pub fn incoming_info(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
        self.pool.iter_pending_incoming()
    }
    
    pub fn unknown_dials(&self) -> impl Iterator<Item = &Multiaddr> {
        self.pool.iter_pending_outgoing()
            .filter_map(|info| {
                if info.peer_id.is_none() {
                    Some(info.address)
                } else {
                    None
                }
            })
    }
    
    
    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
        self.pool.iter_connected()
    }
    
    pub fn is_connected(&self, peer: &PeerId) -> bool {
        self.pool.is_connected(peer)
    }
    
    pub fn is_dialing(&self, peer: &PeerId) -> bool {
        self.dialing.contains_key(peer)
    }
    
    
    pub fn is_disconnected(&self, peer: &PeerId) -> bool {
        !self.is_connected(peer) && !self.is_dialing(peer)
    }
    
    
    pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
        self.dialing.keys()
    }
    
    pub fn peer(&mut self, peer_id: PeerId)
        -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
    {
        Peer::new(self, peer_id)
    }
    
    
    
    
    
    pub fn accept(
        &mut self,
        connection: IncomingConnection<TTrans::ListenerUpgrade>,
        handler: THandler,
    ) -> Result<ConnectionId, ConnectionLimit>
    where
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        TMuxer: StreamMuxer + Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send,
        TTrans: Transport<Output = (PeerId, TMuxer)>,
        TTrans::Error: Send + 'static,
        TTrans::ListenerUpgrade: Send + 'static,
    {
        let upgrade = connection.upgrade.map_err(|err|
            PendingConnectionError::Transport(TransportError::Other(err)));
        let info = IncomingInfo {
            local_addr: &connection.local_addr,
            send_back_addr: &connection.send_back_addr,
        };
        self.pool.add_incoming(upgrade, handler, info)
    }
    
    pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
    where
        TTrans: Transport<Output = (PeerId, TMuxer)>,
        TTrans::Error: Send + 'static,
        TTrans::Dial: Send + 'static,
        TTrans::ListenerUpgrade: Send + 'static,
        TMuxer: Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        THandler: IntoConnectionHandler + Send + 'static,
        THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
        <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
    {
        
        match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
            Poll::Pending => (),
            Poll::Ready(ListenersEvent::Incoming {
                listener_id,
                upgrade,
                local_addr,
                send_back_addr
            }) => {
                return Poll::Ready(NetworkEvent::IncomingConnection {
                    listener_id,
                    connection: IncomingConnection {
                        upgrade,
                        local_addr,
                        send_back_addr,
                    }
                })
            }
            Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
                return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })
            }
            Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
                return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
            }
            Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => {
                return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason })
            }
            Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
                return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
            }
        }
        
        let event = match self.pool.poll(cx) {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
                if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id().clone()) {
                    e.get_mut().retain(|s| s.current.0 != connection.id());
                    if e.get().is_empty() {
                        e.remove();
                    }
                }
                NetworkEvent::ConnectionEstablished {
                    connection,
                    num_established,
                }
            }
            Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
                let dialing = &mut self.dialing;
                let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
                if let Some(dial) = next {
                    let transport = self.listeners.transport().clone();
                    if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
                        log::warn!("Dialing aborted: {:?}", e);
                    }
                }
                event
            }
            Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, .. }) => {
                NetworkEvent::ConnectionClosed {
                    id,
                    connected,
                    num_established,
                    error,
                }
            }
            Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
                NetworkEvent::ConnectionEvent {
                    connection,
                    event,
                }
            }
            Poll::Ready(PoolEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
                NetworkEvent::AddressChange {
                    connection,
                    new_endpoint,
                    old_endpoint,
                }
            }
        };
        Poll::Ready(event)
    }
    
    fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
        -> Result<ConnectionId, ConnectionLimit>
    where
        TTrans: Transport<Output = (PeerId, TMuxer)>,
        TTrans::Dial: Send + 'static,
        TTrans::Error: Send + 'static,
        TMuxer: Send + Sync + 'static,
        TMuxer::OutboundSubstream: Send,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
    {
        dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
    }
}
struct DialingOpts<PeerId, THandler> {
    peer: PeerId,
    handler: THandler,
    address: Multiaddr,
    remaining: Vec<Multiaddr>,
}
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
    transport: TTrans,
    pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
        <THandler::Handler as ConnectionHandler>::Error>,
    dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
    opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
    THandler: IntoConnectionHandler + Send + 'static,
    <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
    <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
    THandler::Handler: ConnectionHandler<
        Substream = Substream<TMuxer>,
        InEvent = TInEvent,
        OutEvent = TOutEvent,
    > + Send + 'static,
    TTrans: Transport<Output = (PeerId, TMuxer)>,
    TTrans::Dial: Send + 'static,
    TTrans::Error: error::Error + Send + 'static,
    TMuxer: StreamMuxer + Send + Sync + 'static,
    TMuxer::OutboundSubstream: Send + 'static,
    TInEvent: Send + 'static,
    TOutEvent: Send + 'static,
{
    let result = match transport.dial(opts.address.clone()) {
        Ok(fut) => {
            let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
            let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
            pool.add_outgoing(fut, opts.handler, info)
        },
        Err(err) => {
            let fut = future::err(PendingConnectionError::Transport(err));
            let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
            pool.add_outgoing(fut, opts.handler, info)
        },
    };
    if let Ok(id) = &result {
        dialing.entry(opts.peer).or_default().push(
            peer::DialingState {
                current: (*id, opts.address),
                remaining: opts.remaining,
            },
        );
    }
    result
}
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
    dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
    id: ConnectionId,
    endpoint: ConnectedPoint,
    error: PendingConnectionError<TTrans::Error>,
    handler: Option<THandler>,
) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
where
    TTrans: Transport,
    THandler: IntoConnectionHandler,
{
    
    let dialing_failed = dialing.iter_mut()
        .find_map(|(peer, attempts)| {
            if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
                let attempt = attempts.remove(pos);
                let last = attempts.is_empty();
                Some((peer.clone(), attempt, last))
            } else {
                None
            }
        });
    if let Some((peer_id, mut attempt, last)) = dialing_failed {
        if last {
            dialing.remove(&peer_id);
        }
        let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
        let failed_addr = attempt.current.1.clone();
        let (opts, attempts_remaining) =
            if num_remain > 0 {
                if let Some(handler) = handler {
                    let next_attempt = attempt.remaining.remove(0);
                    let opts = DialingOpts {
                        peer: peer_id.clone(),
                        handler,
                        address: next_attempt,
                        remaining: attempt.remaining
                    };
                    (Some(opts), num_remain)
                } else {
                    
                    
                    
                    (None, 0)
                }
            } else {
                (None, 0)
            };
        (opts, NetworkEvent::DialError {
            attempts_remaining,
            peer_id,
            multiaddr: failed_addr,
            error,
        })
    } else {
        
        match endpoint {
            ConnectedPoint::Dialer { address } =>
                (None, NetworkEvent::UnknownPeerDialError {
                    multiaddr: address,
                    error,
                }),
            ConnectedPoint::Listener { local_addr, send_back_addr } =>
                (None, NetworkEvent::IncomingConnectionError {
                    local_addr,
                    send_back_addr,
                    error
                })
        }
    }
}
#[derive(Clone, Debug)]
pub struct NetworkInfo {
    
    num_peers: usize,
    
    connection_counters: ConnectionCounters,
}
impl NetworkInfo {
    
    
    pub fn num_peers(&self) -> usize {
        self.num_peers
    }
    
    pub fn connection_counters(&self) -> &ConnectionCounters {
        &self.connection_counters
    }
}
#[derive(Default)]
pub struct NetworkConfig {
    
    
    
    manager_config: ManagerConfig,
    
    limits: ConnectionLimits,
}
impl NetworkConfig {
    
    pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
        self.manager_config.executor = Some(e);
        self
    }
    
    
    pub fn or_else_with_executor<F>(mut self, f: F) -> Self
    where
        F: FnOnce() -> Option<Box<dyn Executor + Send>>
    {
        self.manager_config.executor = self.manager_config.executor.or_else(f);
        self
    }
    
    
    
    
    
    
    
    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
        self.manager_config.task_command_buffer_size = n.get() - 1;
        self
    }
    
    
    
    
    
    
    pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
        self.manager_config.task_event_buffer_size = n;
        self
    }
    
    pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
        self.limits = limits;
        self
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    struct Dummy;
    impl Executor for Dummy {
        fn exec(&self, _: Pin<Box<dyn Future<Output=()> + Send>>) { }
    }
    #[test]
    fn set_executor() {
        NetworkConfig::default()
            .with_executor(Box::new(Dummy))
            .with_executor(Box::new(|f| {
                async_std::task::spawn(f);
            }));
    }
}