use codec::{self, Encode, Decode};
use crate::{
	config::ProtocolId,
	protocol::message::{BlockAttributes},
	schema,
	PeerId,
};
use crate::request_responses::{RequestFailure, OutboundFailure};
use futures::{channel::{oneshot}, future::BoxFuture, prelude::*, stream::FuturesUnordered};
use prost::Message;
use sc_client_api::{
	light::{
		self, RemoteBodyRequest,
	}
};
use sc_peerset::ReputationChange;
use sp_blockchain::{Error as ClientError};
use sp_runtime::{
	traits::{Block, Header, NumberFor},
};
use std::{
	collections::{BTreeMap, VecDeque, HashMap},
	pin::Pin,
	sync::Arc,
	task::{Context, Poll},
};
mod rep {
	use super::*;
	
	pub const TIMEOUT: ReputationChange = ReputationChange::new(-(1 << 8), "light client request timeout");
	
	pub const REFUSED: ReputationChange = ReputationChange::new(-(1 << 8), "light client request refused");
}
#[derive(Debug, Clone)]
struct Config {
	max_pending_requests: usize,
	light_protocol: String,
	block_protocol: String,
}
impl Config {
	
	pub fn new(id: &ProtocolId) -> Self {
		Config {
			max_pending_requests: 128,
			light_protocol: super::generate_protocol_name(id),
			block_protocol: crate::block_request_handler::generate_protocol_name(id),
		}
	}
}
pub struct LightClientRequestSender<B: Block> {
	
	config: Config,
	
	checker: Arc<dyn light::FetchChecker<B>>,
	
	peers: HashMap<PeerId, PeerInfo<B>>,
	
	pending_requests: VecDeque<PendingRequest<B>>,
	
	sent_requests: FuturesUnordered<BoxFuture<
			'static, (SentRequest<B>, Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>),
		>>,
	
	peerset: sc_peerset::PeersetHandle,
}
#[derive(Debug)]
struct PendingRequest<B: Block> {
	
	attempts_left: usize,
	
	request: Request<B>,
}
impl<B: Block> PendingRequest<B> {
	fn new(req: Request<B>) -> Self {
		PendingRequest {
			
			attempts_left: req.retries() + 1,
			request: req,
		}
	}
	fn into_sent(self, peer_id: PeerId) -> SentRequest<B> {
		SentRequest {
			attempts_left: self.attempts_left,
			request: self.request,
			peer: peer_id,
		}
	}
}
#[derive(Debug)]
struct SentRequest<B: Block> {
	
	attempts_left: usize,
	
	request: Request<B>,
	
	peer: PeerId,
}
impl<B: Block> SentRequest<B> {
	fn into_pending(self) -> PendingRequest<B> {
		PendingRequest {
			attempts_left: self.attempts_left,
			request: self.request,
		}
	}
}
impl<B: Block> Unpin for LightClientRequestSender<B> {}
impl<B> LightClientRequestSender<B>
where
	B: Block,
{
	
	pub fn new(
		id: &ProtocolId,
		checker: Arc<dyn light::FetchChecker<B>>,
		peerset: sc_peerset::PeersetHandle,
	) -> Self {
		LightClientRequestSender {
			config: Config::new(id),
			checker,
			peers: Default::default(),
			pending_requests: Default::default(),
			sent_requests: Default::default(),
			peerset,
		}
	}
	
	
	pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor<B>) {
		if let Some(info) = self.peers.get_mut(peer) {
			log::trace!("new best block for {:?}: {:?}", peer, num);
			info.best_block = Some(num)
		}
	}
	
	pub fn request(&mut self, req: Request<B>) -> Result<(), SendRequestError> {
		if self.pending_requests.len() >= self.config.max_pending_requests {
			return Err(SendRequestError::TooManyRequests)
		}
		self.pending_requests.push_back(PendingRequest::new(req));
		Ok(())
	}
	
	
	
	
	fn remove_peer(&mut self, peer: PeerId) {
		self.peers.remove(&peer);
	}
	
	
	
	
	fn on_response(
		&mut self,
		peer: PeerId,
		request: &Request<B>,
		response: Response,
	) -> Result<Reply<B>, Error>	{
		log::trace!("response from {}", peer);
		match response {
			Response::Light(r) => self.on_response_light(request, r),
			Response::Block(r) => self.on_response_block(request, r),
		}
	}
	fn on_response_light(
		&mut self,
		request: &Request<B>,
		response: schema::v1::light::Response,
	) -> Result<Reply<B>, Error> {
		use schema::v1::light::response::Response;
		match response.response {
			Some(Response::RemoteCallResponse(response)) =>
				if let Request::Call { request , .. } = request {
					let proof = Decode::decode(&mut response.proof.as_ref())?;
					let reply = self.checker.check_execution_proof(request, proof)?;
					Ok(Reply::VecU8(reply))
				} else {
					Err(Error::UnexpectedResponse)
				}
			Some(Response::RemoteReadResponse(response)) =>
				match request {
					Request::Read { request, .. } => {
						let proof = Decode::decode(&mut response.proof.as_ref())?;
						let reply = self.checker.check_read_proof(&request, proof)?;
						Ok(Reply::MapVecU8OptVecU8(reply))
					}
					Request::ReadChild { request, .. } => {
						let proof = Decode::decode(&mut response.proof.as_ref())?;
						let reply = self.checker.check_read_child_proof(&request, proof)?;
						Ok(Reply::MapVecU8OptVecU8(reply))
					}
					_ => Err(Error::UnexpectedResponse)
				}
			Some(Response::RemoteChangesResponse(response)) =>
				if let Request::Changes { request, .. } = request {
					let max_block = Decode::decode(&mut response.max.as_ref())?;
					let roots_proof = Decode::decode(&mut response.roots_proof.as_ref())?;
					let roots = {
						let mut r = BTreeMap::new();
						for pair in response.roots {
							let k = Decode::decode(&mut pair.fst.as_ref())?;
							let v = Decode::decode(&mut pair.snd.as_ref())?;
							r.insert(k, v);
						}
						r
					};
					let reply = self.checker.check_changes_proof(&request, light::ChangesProof {
						max_block,
						proof: response.proof,
						roots,
						roots_proof,
					})?;
					Ok(Reply::VecNumberU32(reply))
				} else {
					Err(Error::UnexpectedResponse)
				}
			Some(Response::RemoteHeaderResponse(response)) =>
				if let Request::Header { request, .. } = request {
					let header =
						if response.header.is_empty() {
							None
						} else {
							Some(Decode::decode(&mut response.header.as_ref())?)
						};
					let proof = Decode::decode(&mut response.proof.as_ref())?;
					let reply = self.checker.check_header_proof(&request, header, proof)?;
					Ok(Reply::Header(reply))
				} else {
					Err(Error::UnexpectedResponse)
				}
			None => Err(Error::UnexpectedResponse)
		}
	}
	fn on_response_block(
		&mut self,
		request: &Request<B>,
		response: schema::v1::BlockResponse,
	) -> Result<Reply<B>, Error> {
		let request = if let Request::Body { request , .. } = &request {
			request
		} else {
			return Err(Error::UnexpectedResponse);
		};
		let body: Vec<_> = match response.blocks.into_iter().next() {
			Some(b) => b.body,
			None => return Err(Error::UnexpectedResponse),
		};
		let body = body.into_iter()
			.map(|extrinsic| B::Extrinsic::decode(&mut &extrinsic[..]))
			.collect::<Result<_, _>>()?;
		let body = self.checker.check_body_proof(&request, body)?;
		Ok(Reply::Extrinsics(body))
	}
	
	pub fn inject_connected(&mut self, peer: PeerId) {
		let prev_entry = self.peers.insert(peer, Default::default());
		debug_assert!(
			prev_entry.is_none(),
			"Expect `inject_connected` to be called for disconnected peer.",
		);
	}
	
	pub fn inject_disconnected(&mut self, peer: PeerId) {
		self.remove_peer(peer)
	}
}
impl<B: Block> Stream for LightClientRequestSender<B> {
	type Item = OutEvent;
	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
		
		while let Poll::Ready(Some((sent_request, request_result))) = self.sent_requests.poll_next_unpin(cx) {
			if let Some(info) = self.peers.get_mut(&sent_request.peer) {
				if info.status != PeerStatus::Busy {
					
					
					
					
					panic!("unexpected peer status {:?} for {}", info.status, sent_request.peer);
				}
				info.status = PeerStatus::Idle; 
			}
			let request_result = match request_result {
				Ok(r) => r,
				Err(oneshot::Canceled) => {
					log::debug!("Oneshot for request to peer {} was canceled.", sent_request.peer);
					self.remove_peer(sent_request.peer);
					self.peerset.report_peer(sent_request.peer, ReputationChange::new_fatal("no response from peer"));
					self.pending_requests.push_back(sent_request.into_pending());
					continue;
				}
			};
			let decoded_request_result = request_result.map(|response| {
				if sent_request.request.is_block_request() {
					schema::v1::BlockResponse::decode(&response[..])
						.map(|r| Response::Block(r))
				} else {
					schema::v1::light::Response::decode(&response[..])
						.map(|r| Response::Light(r))
				}
			});
			let response = match decoded_request_result {
				Ok(Ok(response)) => response,
				Ok(Err(e)) => {
					log::debug!("Failed to decode response from peer {}: {:?}.", sent_request.peer, e);
					self.remove_peer(sent_request.peer);
					self.peerset.report_peer(sent_request.peer, ReputationChange::new_fatal("invalid response from peer"));
					self.pending_requests.push_back(sent_request.into_pending());
					continue;
				},
				Err(e) => {
					log::debug!("Request to peer {} failed with {:?}.", sent_request.peer, e);
					match e {
						RequestFailure::NotConnected => {
							self.remove_peer(sent_request.peer);
							self.pending_requests.push_back(sent_request.into_pending());
						}
						RequestFailure::UnknownProtocol => {
							debug_assert!(
								false,
								"Light client and block request protocol should be known when \
								 sending requests.",
							);
						}
						RequestFailure::Refused => {
							self.remove_peer(sent_request.peer);
							self.peerset.report_peer(
								sent_request.peer,
								rep::REFUSED,
							);
							self.pending_requests.push_back(sent_request.into_pending());
						}
						RequestFailure::Obsolete => {
							debug_assert!(
								false,
								"Can not receive `RequestFailure::Obsolete` after dropping the \
								 response receiver.",
							);
							self.pending_requests.push_back(sent_request.into_pending());
						}
						RequestFailure::Network(OutboundFailure::Timeout) => {
							self.remove_peer(sent_request.peer);
							self.peerset.report_peer(
								sent_request.peer,
								rep::TIMEOUT,
							);
							self.pending_requests.push_back(sent_request.into_pending());
						},
						RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
							self.remove_peer(sent_request.peer);
							self.peerset.report_peer(
								sent_request.peer,
								ReputationChange::new_fatal(
									"peer does not support light client or block request protocol",
								),
							);
							self.pending_requests.push_back(sent_request.into_pending());
						}
						RequestFailure::Network(OutboundFailure::DialFailure) => {
							self.remove_peer(sent_request.peer);
							self.peerset.report_peer(
								sent_request.peer,
								ReputationChange::new_fatal(
									"failed to dial peer",
								),
							);
							self.pending_requests.push_back(sent_request.into_pending());
						}
						RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
							self.remove_peer(sent_request.peer);
							self.peerset.report_peer(
								sent_request.peer,
								ReputationChange::new_fatal(
									"connection to peer closed",
								),
							);
							self.pending_requests.push_back(sent_request.into_pending());
						}
					}
					continue;
				}
			};
			match self.on_response(sent_request.peer, &sent_request.request, response) {
				Ok(reply) => sent_request.request.return_reply(Ok(reply)),
				Err(Error::UnexpectedResponse) => {
					log::debug!("Unexpected response from peer {}.", sent_request.peer);
					self.remove_peer(sent_request.peer);
					self.peerset.report_peer(
						sent_request.peer,
						ReputationChange::new_fatal(
							"unexpected response from peer",
						),
					);
					self.pending_requests.push_back(sent_request.into_pending());
				}
				Err(other) => {
					log::debug!("error handling response from peer {}: {}", sent_request.peer, other);
					self.remove_peer(sent_request.peer);
					self.peerset.report_peer(
						sent_request.peer,
						ReputationChange::new_fatal(
							"invalid response from peer",
						),
					);
					self.pending_requests.push_back(sent_request.into_pending())
				}
			}
		}
		
		while let Some(mut pending_request) = self.pending_requests.pop_front() {
			if pending_request.attempts_left == 0 {
				pending_request.request.return_reply(Err(ClientError::RemoteFetchFailed));
				continue
			}
			let protocol = if pending_request.request.is_block_request() {
				self.config.block_protocol.clone()
			} else {
				self.config.light_protocol.clone()
			};
			
			
			let mut peer = None;
			for (peer_id, peer_info) in self.peers.iter_mut() {
				if peer_info.status == PeerStatus::Idle {
					match peer_info.best_block {
						Some(n) if n >= pending_request.request.required_block() => {
							peer = Some((*peer_id, peer_info));
							break
						},
						_ => peer = Some((*peer_id, peer_info))
					}
				}
			}
			
			let (peer_id, peer_info) = match peer {
				Some((peer_id, peer_info)) => (peer_id, peer_info),
				None => {
					self.pending_requests.push_front(pending_request);
					log::debug!("No peer available to send request to.");
					break;
				}
			};
			let request_bytes = match pending_request.request.serialize_request() {
				Ok(bytes) => bytes,
				Err(error) => {
					log::debug!("failed to serialize request: {}", error);
					pending_request.request.return_reply(Err(ClientError::RemoteFetchFailed));
					continue
				}
			};
			let (tx, rx) = oneshot::channel();
			peer_info.status = PeerStatus::Busy;
			pending_request.attempts_left -= 1;
			self.sent_requests.push(async move {
				(pending_request.into_sent(peer_id), rx.await)
			}.boxed());
			return Poll::Ready(Some(OutEvent::SendRequest {
				target: peer_id,
				request: request_bytes,
				pending_response: tx,
				protocol_name: protocol,
			}));
		}
		Poll::Pending
	}
}
#[derive(Debug)]
pub enum OutEvent {
	
	SendRequest {
		
		target: PeerId,
		
		request: Vec<u8>,
		
		pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
		
		protocol_name: String,
	}
}
#[derive(Debug, Clone)]
pub enum Response {
	
	Light(schema::v1::light::Response),
	
	Block(schema::v1::BlockResponse),
}
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum SendRequestError {
	
	#[display(fmt = "too many pending requests")]
	TooManyRequests,
}
#[derive(Debug, derive_more::Display, derive_more::From)]
enum Error {
	
	#[display(fmt = "unexpected response")]
	UnexpectedResponse,
	
	#[display(fmt = "codec error: {}", _0)]
	Codec(codec::Error),
	
	#[display(fmt = "client error: {}", _0)]
	Client(ClientError),
}
#[derive(Debug)]
enum Reply<B: Block> {
	VecU8(Vec<u8>),
	VecNumberU32(Vec<(<B::Header as Header>::Number, u32)>),
	MapVecU8OptVecU8(HashMap<Vec<u8>, Option<Vec<u8>>>),
	Header(B::Header),
	Extrinsics(Vec<B::Extrinsic>),
}
#[derive(Debug)]
struct PeerInfo<B: Block> {
	best_block: Option<NumberFor<B>>,
	status: PeerStatus,
}
impl<B: Block> Default for PeerInfo<B> {
	fn default() -> Self {
		PeerInfo {
			best_block: None,
			status: PeerStatus::Idle,
		}
	}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum PeerStatus {
	
	Idle,
	
	Busy,
}
#[derive(Debug)]
pub enum Request<B: Block> {
	
	Body {
		
		request: RemoteBodyRequest<B::Header>,
		
		sender: oneshot::Sender<Result<Vec<B::Extrinsic>, ClientError>>
	},
	
	Header {
		
		request: light::RemoteHeaderRequest<B::Header>,
		
		sender: oneshot::Sender<Result<B::Header, ClientError>>
	},
	
	Read {
		
		request: light::RemoteReadRequest<B::Header>,
		
		sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>>
	},
	
	ReadChild {
		
		request: light::RemoteReadChildRequest<B::Header>,
		
		sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>>
	},
	
	Call {
		
		request: light::RemoteCallRequest<B::Header>,
		
		sender: oneshot::Sender<Result<Vec<u8>, ClientError>>
	},
	
	Changes {
		
		request: light::RemoteChangesRequest<B::Header>,
		
		sender: oneshot::Sender<Result<Vec<(NumberFor<B>, u32)>, ClientError>>
	}
}
impl<B: Block> Request<B> {
	fn is_block_request(&self) -> bool {
		matches!(self, Request::Body { .. })
	}
	fn required_block(&self) -> NumberFor<B> {
		match self {
			Request::Body { request, .. } => *request.header.number(),
			Request::Header { request, .. } => request.block,
			Request::Read { request, .. } => *request.header.number(),
			Request::ReadChild { request, .. } => *request.header.number(),
			Request::Call { request, .. } => *request.header.number(),
			Request::Changes { request, .. } => request.max_block.0,
		}
	}
	fn retries(&self) -> usize {
		let rc = match self {
			Request::Body { request, .. } => request.retry_count,
			Request::Header { request, .. } => request.retry_count,
			Request::Read { request, .. } => request.retry_count,
			Request::ReadChild { request, .. } => request.retry_count,
			Request::Call { request, .. } => request.retry_count,
			Request::Changes { request, .. } => request.retry_count,
		};
		rc.unwrap_or(0)
	}
	fn serialize_request(&self) -> Result<Vec<u8>, prost::EncodeError> {
		let request = match self {
			Request::Body { request, .. } => {
				let rq = schema::v1::BlockRequest {
					fields: BlockAttributes::BODY.to_be_u32(),
					from_block: Some(schema::v1::block_request::FromBlock::Hash(
						request.header.hash().encode(),
					)),
					to_block: Default::default(),
					direction: schema::v1::Direction::Ascending as i32,
					max_blocks: 1,
				};
				let mut buf = Vec::with_capacity(rq.encoded_len());
				rq.encode(&mut buf)?;
				return Ok(buf);
			}
			Request::Header { request, .. } => {
				let r = schema::v1::light::RemoteHeaderRequest { block: request.block.encode() };
				schema::v1::light::request::Request::RemoteHeaderRequest(r)
			}
			Request::Read { request, .. } => {
				let r = schema::v1::light::RemoteReadRequest {
					block: request.block.encode(),
					keys: request.keys.clone(),
				};
				schema::v1::light::request::Request::RemoteReadRequest(r)
			}
			Request::ReadChild { request, .. } => {
				let r = schema::v1::light::RemoteReadChildRequest {
					block: request.block.encode(),
					storage_key: request.storage_key.clone().into_inner(),
					keys: request.keys.clone(),
				};
				schema::v1::light::request::Request::RemoteReadChildRequest(r)
			}
			Request::Call { request, .. } => {
				let r = schema::v1::light::RemoteCallRequest {
					block: request.block.encode(),
					method: request.method.clone(),
					data: request.call_data.clone(),
				};
				schema::v1::light::request::Request::RemoteCallRequest(r)
			}
			Request::Changes { request, .. } => {
				let r = schema::v1::light::RemoteChangesRequest {
					first: request.first_block.1.encode(),
					last: request.last_block.1.encode(),
					min: request.tries_roots.1.encode(),
					max: request.max_block.1.encode(),
					storage_key: request.storage_key.clone().map(|s| s.into_inner())
						.unwrap_or_default(),
					key: request.key.clone(),
				};
				schema::v1::light::request::Request::RemoteChangesRequest(r)
			}
		};
		let rq = schema::v1::light::Request { request: Some(request) };
		let mut buf = Vec::with_capacity(rq.encoded_len());
		rq.encode(&mut buf)?;
		Ok(buf)
	}
	fn return_reply(self, result: Result<Reply<B>, ClientError>) {
		fn send<T>(item: T, sender: oneshot::Sender<T>) {
			let _ = sender.send(item); 
		}
		match self {
			Request::Body { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::Extrinsics(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for body request: {:?}, {:?}", reply, request),
			}
			Request::Header { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::Header(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for header request: {:?}, {:?}", reply, request),
			}
			Request::Read { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for read request: {:?}, {:?}", reply, request),
			}
			Request::ReadChild { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for read child request: {:?}, {:?}", reply, request),
			}
			Request::Call { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::VecU8(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for call request: {:?}, {:?}", reply, request),
			}
			Request::Changes { request, sender } => match result {
				Err(e) => send(Err(e), sender),
				Ok(Reply::VecNumberU32(x)) => send(Ok(x), sender),
				reply => log::error!("invalid reply for changes request: {:?}, {:?}", reply, request),
			}
		}
	}
}
#[cfg(test)]
mod tests {
	use super::*;
	use crate::light_client_requests::tests::{DummyFetchChecker, protocol_id, peerset, dummy_header};
	use crate::request_responses::OutboundFailure;
	use assert_matches::assert_matches;
	use futures::channel::oneshot;
	use futures::executor::block_on;
	use futures::poll;
	use sc_client_api::StorageProof;
	use sp_core::storage::ChildInfo;
	use sp_runtime::generic::Header;
	use sp_runtime::traits::BlakeTwo256;
	use std::collections::HashSet;
	use std::iter::FromIterator;
	fn empty_proof() -> Vec<u8> {
		StorageProof::empty().encode()
	}
	#[test]
	fn removes_peer_if_told() {
		let peer = PeerId::random();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(DummyFetchChecker {
				ok: true,
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		sender.inject_connected(peer);
		assert_eq!(1, sender.peers.len());
		sender.inject_disconnected(peer);
		assert_eq!(0, sender.peers.len());
	}
	type Block =
		sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>;
	#[test]
	fn body_request_fields_encoded_properly() {
		let (sender, _receiver) = oneshot::channel();
		let request = Request::<Block>::Body {
			request: RemoteBodyRequest {
				header: dummy_header(),
				retry_count: None,
			},
			sender,
		};
		let serialized_request = request.serialize_request().unwrap();
		let deserialized_request = schema::v1::BlockRequest::decode(&serialized_request[..]).unwrap();
		assert!(BlockAttributes::from_be_u32(deserialized_request.fields)
				.unwrap()
				.contains(BlockAttributes::BODY));
	}
	#[test]
	fn disconnects_from_peer_if_request_times_out() {
		let peer0 = PeerId::random();
		let peer1 = PeerId::random();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
				ok: true,
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		sender.inject_connected(peer0);
		sender.inject_connected(peer1);
		assert_eq!(
			HashSet::from_iter(&[peer0.clone(), peer1.clone()]),
			sender.peers.keys().collect::<HashSet<_>>(),
			"Expect knowledge of two peers."
		);
		assert!(sender.pending_requests.is_empty(), "Expect no pending request.");
		assert!(sender.sent_requests.is_empty(), "Expect no sent request.");
		
		let chan = oneshot::channel();
		let request = light::RemoteCallRequest {
			block: Default::default(),
			header: dummy_header(),
			method: "test".into(),
			call_data: vec![],
			retry_count: Some(1),
		};
		sender.request(Request::Call { request, sender: chan.0 }).unwrap();
		assert_eq!(1, sender.pending_requests.len(), "Expect one pending request.");
		let OutEvent::SendRequest { target, pending_response, .. } = block_on(sender.next()).unwrap();
		assert!(
			target == peer0 || target == peer1,
			"Expect request to originate from known peer.",
		);
		
		assert!({
			let (idle, busy): (Vec<_>, Vec<_>) = sender
				.peers
				.iter()
				.partition(|(_, info)| info.status == PeerStatus::Idle);
			idle.len() == 1
				&& busy.len() == 1
				&& (idle[0].0 == &peer0 || busy[0].0 == &peer0)
				&& (idle[0].0 == &peer1 || busy[0].0 == &peer1)
		});
		assert_eq!(0, sender.pending_requests.len(), "Expect no pending request.");
		assert_eq!(1, sender.sent_requests.len(), "Expect one request to be sent.");
		
		pending_response.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).unwrap();
		
		let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap();
		assert_eq!(1, sender.peers.len(), "Expect peer to be removed.");
		assert_eq!(0, sender.pending_requests.len(), "Expect no request to be pending.");
		assert_eq!(1, sender.sent_requests.len(), "Expect new request to be issued.");
		
		pending_response.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).unwrap();
		assert_matches!(
			block_on(async { poll!(sender.next()) }), Poll::Pending,
			"Expect sender to not issue another attempt.",
		);
		assert_matches!(
			block_on(chan.1).unwrap(), Err(ClientError::RemoteFetchFailed),
			"Expect request failure to be reported.",
		);
		assert_eq!(0, sender.peers.len(), "Expect no peer to be left");
		assert_eq!(0, sender.pending_requests.len(), "Expect no request to be pending.");
		assert_eq!(0, sender.sent_requests.len(), "Expect no other request to be in progress.");
	}
	#[test]
	fn disconnects_from_peer_on_incorrect_response() {
		let peer = PeerId::random();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
				ok: false,
				
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		sender.inject_connected(peer);
		assert_eq!(1, sender.peers.len(), "Expect one peer.");
		let chan = oneshot::channel();
		let request = light::RemoteCallRequest {
			block: Default::default(),
			header: dummy_header(),
			method: "test".into(),
			call_data: vec![],
			retry_count: Some(1),
		};
		sender
			.request(Request::Call {
				request,
				sender: chan.0,
			})
			.unwrap();
		assert_eq!(1, sender.pending_requests.len(), "Expect one pending request.");
		assert_eq!(0, sender.sent_requests.len(), "Expect zero sent requests.");
		let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap();
		assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests.");
		assert_eq!(1, sender.sent_requests.len(), "Expect one sent request.");
		let response = {
			let r = schema::v1::light::RemoteCallResponse {
				proof: empty_proof(),
			};
			let response = schema::v1::light::Response {
				response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
			};
			let mut data = Vec::new();
			response.encode(&mut data).unwrap();
			data
		};
		pending_response.send(Ok(response)).unwrap();
		assert_matches!(
			block_on(async { poll!(sender.next()) }), Poll::Pending,
			"Expect sender to not issue another attempt, given that there is no peer left.",
		);
		assert!(sender.peers.is_empty(), "Expect no peers to be left.");
		assert_eq!(1, sender.pending_requests.len(), "Expect request to be pending again.");
		assert_eq!(0, sender.sent_requests.len(), "Expect no request to be sent.");
	}
	#[test]
	fn disconnects_from_peer_on_wrong_response_type() {
		let peer = PeerId::random();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
				ok: true,
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		sender.inject_connected(peer);
		assert_eq!(1, sender.peers.len(), "Expect one peer.");
		let chan = oneshot::channel();
		let request = light::RemoteCallRequest {
			block: Default::default(),
			header: dummy_header(),
			method: "test".into(),
			call_data: vec![],
			retry_count: Some(1),
		};
		sender
			.request(Request::Call {
				request,
				sender: chan.0,
			})
			.unwrap();
		assert_eq!(1, sender.pending_requests.len());
		assert_eq!(0, sender.sent_requests.len());
		let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap();
		assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests.");
		assert_eq!(1, sender.sent_requests.len(), "Expect one sent request.");
		let response = {
			let r = schema::v1::light::RemoteReadResponse {
				proof: empty_proof(),
			}; 
			let response = schema::v1::light::Response {
				response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
			};
			let mut data = Vec::new();
			response.encode(&mut data).unwrap();
			data
		};
		pending_response.send(Ok(response)).unwrap();
		assert_matches!(
			block_on(async { poll!(sender.next()) }), Poll::Pending,
			"Expect sender to not issue another attempt, given that there is no peer left.",
		);
		assert!(sender.peers.is_empty(), "Expect no peers to be left.");
		assert_eq!(1, sender.pending_requests.len(), "Expect request to be pending again.");
		assert_eq!(0, sender.sent_requests.len(), "Expect no request to be sent.");
	}
	#[test]
	fn receives_remote_failure_after_retry_count_failures() {
		let peers = (0..4).map(|_| PeerId::random()).collect::<Vec<_>>();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
				ok: false,
				
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		for peer in &peers {
			sender.inject_connected(*peer);
		}
		assert_eq!(4, sender.peers.len(), "Expect four peers.");
		let mut chan = oneshot::channel();
		let request = light::RemoteCallRequest {
			block: Default::default(),
			header: dummy_header(),
			method: "test".into(),
			call_data: vec![],
			retry_count: Some(3), 
		};
		sender
			.request(Request::Call {
				request,
				sender: chan.0,
			})
			.unwrap();
		assert_eq!(1, sender.pending_requests.len());
		assert_eq!(0, sender.sent_requests.len());
		let mut pending_response = match block_on(sender.next()).unwrap() {
			OutEvent::SendRequest { pending_response, .. } => Some(pending_response),
		};
		assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests.");
		assert_eq!(1, sender.sent_requests.len(), "Expect one sent request.");
		for (i, _peer) in peers.iter().enumerate() {
			
			let response = {
				let r = schema::v1::light::RemoteCallResponse {
					proof: empty_proof(),
				};
				let response = schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
				};
				let mut data = Vec::new();
				response.encode(&mut data).unwrap();
				data
			};
			pending_response.take().unwrap().send(Ok(response)).unwrap();
			if i < 3 {
				pending_response = match block_on(sender.next()).unwrap() {
					OutEvent::SendRequest { pending_response, .. } => Some(pending_response),
				};
				assert_matches!(chan.1.try_recv(), Ok(None))
			} else {
				
				assert_matches!(
					block_on(async { poll!(sender.next()) }), Poll::Pending,
					"Expect sender to not issue another attempt, given that there is no peer left.",
				);
				assert_matches!(
					chan.1.try_recv(),
					Ok(Some(Err(ClientError::RemoteFetchFailed)))
				)
			}
		}
	}
	fn issue_request(request: Request<Block>) {
		let peer = PeerId::random();
		let (_peer_set, peer_set_handle) = peerset();
		let mut sender = LightClientRequestSender::<Block>::new(
			&protocol_id(),
			Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
				ok: true,
				_mark: std::marker::PhantomData,
			}),
			peer_set_handle,
		);
		sender.inject_connected(peer);
		assert_eq!(1, sender.peers.len(), "Expect one peer.");
		let response = match request {
			Request::Body { .. } => unimplemented!(),
			Request::Header { .. } => {
				let r = schema::v1::light::RemoteHeaderResponse {
					header: dummy_header().encode(),
					proof: empty_proof(),
				};
				schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteHeaderResponse(
						r,
					)),
				}
			}
			Request::Read { .. } => {
				let r = schema::v1::light::RemoteReadResponse {
					proof: empty_proof(),
				};
				schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
				}
			}
			Request::ReadChild { .. } => {
				let r = schema::v1::light::RemoteReadResponse {
					proof: empty_proof(),
				};
				schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
				}
			}
			Request::Call { .. } => {
				let r = schema::v1::light::RemoteCallResponse {
					proof: empty_proof(),
				};
				schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
				}
			}
			Request::Changes { .. } => {
				let r = schema::v1::light::RemoteChangesResponse {
					max: std::iter::repeat(1).take(32).collect(),
					proof: Vec::new(),
					roots: Vec::new(),
					roots_proof: empty_proof(),
				};
				schema::v1::light::Response {
					response: Some(schema::v1::light::response::Response::RemoteChangesResponse(r)),
				}
			}
		};
		let response = {
			let mut data = Vec::new();
			response.encode(&mut data).unwrap();
			data
		};
		sender.request(request).unwrap();
		assert_eq!(1, sender.pending_requests.len());
		assert_eq!(0, sender.sent_requests.len());
		let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap();
		assert_eq!(0, sender.pending_requests.len());
		assert_eq!(1, sender.sent_requests.len());
		pending_response.send(Ok(response)).unwrap();
		assert_matches!(
			block_on(async { poll!(sender.next()) }), Poll::Pending,
			"Expect sender to not issue another attempt, given that there is no peer left.",
		);
		assert_eq!(0, sender.pending_requests.len());
		assert_eq!(0, sender.sent_requests.len())
	}
	#[test]
	fn receives_remote_call_response() {
		let mut chan = oneshot::channel();
		let request = light::RemoteCallRequest {
			block: Default::default(),
			header: dummy_header(),
			method: "test".into(),
			call_data: vec![],
			retry_count: None,
		};
		issue_request(Request::Call {
			request,
			sender: chan.0,
		});
		assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
	}
	#[test]
	fn receives_remote_read_response() {
		let mut chan = oneshot::channel();
		let request = light::RemoteReadRequest {
			header: dummy_header(),
			block: Default::default(),
			keys: vec![b":key".to_vec()],
			retry_count: None,
		};
		issue_request(Request::Read {
			request,
			sender: chan.0,
		});
		assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
	}
	#[test]
	fn receives_remote_read_child_response() {
		let mut chan = oneshot::channel();
		let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
		let request = light::RemoteReadChildRequest {
			header: dummy_header(),
			block: Default::default(),
			storage_key: child_info.prefixed_storage_key(),
			keys: vec![b":key".to_vec()],
			retry_count: None,
		};
		issue_request(Request::ReadChild {
			request,
			sender: chan.0,
		});
		assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
	}
	#[test]
	fn receives_remote_header_response() {
		let mut chan = oneshot::channel();
		let request = light::RemoteHeaderRequest {
			cht_root: Default::default(),
			block: 1,
			retry_count: None,
		};
		issue_request(Request::Header {
			request,
			sender: chan.0,
		});
		assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
	}
	#[test]
	fn receives_remote_changes_response() {
		let mut chan = oneshot::channel();
		let request = light::RemoteChangesRequest {
			changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
				zero: (0, Default::default()),
				end: None,
				config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
			}],
			first_block: (1, Default::default()),
			last_block: (100, Default::default()),
			max_block: (100, Default::default()),
			tries_roots: (1, Default::default(), Vec::new()),
			key: Vec::new(),
			storage_key: None,
			retry_count: None,
		};
		issue_request(Request::Changes {
			request,
			sender: chan.0,
		});
		assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
	}
}