use crate::{
Multiaddr,
muxing::StreamMuxer,
connection::{
self,
Close,
Connected,
Connection,
ConnectionError,
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream,
},
};
use futures::{prelude::*, channel::mpsc, stream};
use std::{pin::Pin, task::Context, task::Poll};
use super::ConnectResult;
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(pub(super) usize);
#[derive(Debug)]
pub enum Command<T> {
NotifyHandler(T),
Close,
}
#[derive(Debug)]
pub enum Event<T, H, TE, HE> {
Established { id: TaskId, info: Connected },
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
AddressChange { id: TaskId, new_address: Multiaddr },
Notify { id: TaskId, event: T },
Closed { id: TaskId, error: Option<ConnectionError<HE>> }
}
impl<T, H, TE, HE> Event<T, H, TE, HE> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
Event::Failed { id, .. } => id,
Event::AddressChange { id, .. } => id,
Event::Notify { id, .. } => id,
Event::Closed { id, .. } => id,
}
}
}
pub struct Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
state: State<F, M, H, O, E>,
}
impl<F, M, H, I, O, E> Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
future: F,
handler: H
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Pending {
future: Box::pin(future),
handler,
},
}
}
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
connection: Connection<M, H::Handler>
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Established { connection, event: None },
}
}
}
enum State<F, M, H, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
Pending {
future: Pin<Box<F>>,
handler: H,
},
Established {
connection: Connection<M, H::Handler>,
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>
},
Closing(Close<M>),
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error>),
Done
}
impl<F, M, H, I, O, E> Unpin for Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
}
impl<F, M, H, I, O, E> Future for Task<F, M, H, I, O, E>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<M, E>>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = &mut *self;
let id = this.id;
'poll: loop {
match std::mem::replace(&mut this.state, State::Done) {
State::Pending { mut future, handler } => {
match this.commands.poll_next_unpin(cx) {
Poll::Pending => {},
Poll::Ready(None) => {
return Poll::Ready(())
}
Poll::Ready(Some(_)) => panic!(
"Task received command while the connection is pending."
)
}
match future.poll_unpin(cx) {
Poll::Ready(Ok((info, muxer))) => {
this.state = State::Established {
connection: Connection::new(
muxer,
handler.into_handler(&info),
),
event: Some(Event::Established { id, info })
}
}
Poll::Pending => {
this.state = State::Pending { future, handler };
return Poll::Pending
}
Poll::Ready(Err(error)) => {
this.commands.get_mut().close();
let event = Event::Failed { id, handler, error };
this.state = State::Terminating(event)
}
}
}
State::Established { mut connection, event } => {
loop {
match this.commands.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(Some(Command::NotifyHandler(event))) =>
connection.inject_event(event),
Poll::Ready(Some(Command::Close)) => {
this.commands.get_mut().close();
this.state = State::Closing(connection.close());
continue 'poll
}
Poll::Ready(None) => {
return Poll::Ready(())
}
}
}
if let Some(event) = event {
match this.events.poll_ready(cx) {
Poll::Pending => {
this.state = State::Established { connection, event: Some(event) };
return Poll::Pending
}
Poll::Ready(result) => {
if result.is_ok() {
if let Ok(()) = this.events.start_send(event) {
this.state = State::Established { connection, event: None };
continue 'poll
}
}
return Poll::Ready(())
}
}
} else {
match Connection::poll(Pin::new(&mut connection), cx) {
Poll::Pending => {
this.state = State::Established { connection, event: None };
return Poll::Pending
}
Poll::Ready(Ok(connection::Event::Handler(event))) => {
this.state = State::Established {
connection,
event: Some(Event::Notify { id, event })
};
}
Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => {
this.state = State::Established {
connection,
event: Some(Event::AddressChange { id, new_address })
};
}
Poll::Ready(Err(error)) => {
this.commands.get_mut().close();
let event = Event::Closed { id, error: Some(error) };
this.state = State::Terminating(event);
}
}
}
}
State::Closing(mut closing) => {
match closing.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed { id: this.id, error: None };
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e))
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
return Poll::Pending
}
}
}
State::Terminating(event) => {
match this.events.poll_ready(cx) {
Poll::Pending => {
self.state = State::Terminating(event);
return Poll::Pending
}
Poll::Ready(result) => {
if result.is_ok() {
let _ = this.events.start_send(event);
}
return Poll::Ready(())
}
}
}
State::Done => panic!("`Task::poll()` called after completion.")
}
}
}
}