use std::{
	collections::HashMap,
	sync::Arc,
};
use crate::{base_pool as base, watcher::Watcher};
use futures::Future;
use sp_runtime::{
	generic::BlockId,
	traits::{self, SaturatedConversion, Block as BlockT},
	transaction_validity::{
		TransactionValidity, TransactionTag as Tag, TransactionValidityError, TransactionSource,
	},
};
use sp_transaction_pool::error;
use wasm_timer::Instant;
use futures::channel::mpsc::Receiver;
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::{IsValidator, ValidatedTransaction};
pub type EventStream<H> = Receiver<H>;
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
pub type ValidatedTransactionFor<A> = ValidatedTransaction<
	ExtrinsicHash<A>,
	ExtrinsicFor<A>,
	<A as ChainApi>::Error,
>;
pub trait ChainApi: Send + Sync {
	
	type Block: BlockT;
	
	type Error: From<error::Error> + error::IntoPoolError;
	
	type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
	
	type BodyFuture: Future<
		Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>
	> + Unpin + Send + 'static;
	
	fn validate_transaction(
		&self,
		at: &BlockId<Self::Block>,
		source: TransactionSource,
		uxt: ExtrinsicFor<Self>,
	) -> Self::ValidationFuture;
	
	fn block_id_to_number(
		&self,
		at: &BlockId<Self::Block>,
	) -> Result<Option<NumberFor<Self>>, Self::Error>;
	
	fn block_id_to_hash(
		&self,
		at: &BlockId<Self::Block>,
	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
	
	fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
	
	fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
}
#[derive(Debug, Clone)]
pub struct Options {
	
	pub ready: base::Limit,
	
	pub future: base::Limit,
	
	pub reject_future_transactions: bool,
}
impl Default for Options {
	fn default() -> Self {
		Options {
			ready: base::Limit {
				count: 8192,
				total_bytes: 20 * 1024 * 1024,
			},
			future: base::Limit {
				count: 512,
				total_bytes: 1 * 1024 * 1024,
			},
			reject_future_transactions: false,
		}
	}
}
#[derive(Copy, Clone)]
enum CheckBannedBeforeVerify {
	Yes,
	No,
}
pub struct Pool<B: ChainApi> {
	validated_pool: Arc<ValidatedPool<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
where
	ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
	fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
		self.validated_pool.size_of(ops)
	}
}
impl<B: ChainApi> Pool<B> {
	
	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
		Pool {
			validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)),
		}
	}
	
	pub async fn submit_at(
		&self,
		at: &BlockId<B::Block>,
		source: TransactionSource,
		xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
	) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
		let xts = xts.into_iter().map(|xt| (source, xt));
		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?;
		Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
	}
	
	
	
	pub async fn resubmit_at(
		&self,
		at: &BlockId<B::Block>,
		source: TransactionSource,
		xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
	) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
		let xts = xts.into_iter().map(|xt| (source, xt));
		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?;
		Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
	}
	
	pub async fn submit_one(
		&self,
		at: &BlockId<B::Block>,
		source: TransactionSource,
		xt: ExtrinsicFor<B>,
	) -> Result<ExtrinsicHash<B>, B::Error> {
		let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
		res.expect("One extrinsic passed; one result returned; qed")
	}
	
	pub async fn submit_and_watch(
		&self,
		at: &BlockId<B::Block>,
		source: TransactionSource,
		xt: ExtrinsicFor<B>,
	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
		let block_number = self.resolve_block_number(at)?;
		let (_, tx) = self.verify_one(
			at,
			block_number,
			source,
			xt,
			CheckBannedBeforeVerify::Yes,
		).await;
		self.validated_pool.submit_and_watch(tx)
	}
	
	pub fn resubmit(
		&self,
		revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
	) {
		let now = Instant::now();
		self.validated_pool.resubmit(revalidated_transactions);
		log::debug!(target: "txpool",
			"Resubmitted. Took {} ms. Status: {:?}",
			now.elapsed().as_millis(),
			self.validated_pool.status()
		);
	}
	
	
	
	
	
	pub fn prune_known(
		&self,
		at: &BlockId<B::Block>,
		hashes: &[ExtrinsicHash<B>],
	) -> Result<(), B::Error> {
		
		let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
			.into_iter().filter_map(|x| x).flat_map(|x| x);
		
		let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
		let pruned_transactions = hashes.into_iter().cloned()
			.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
		self.validated_pool.fire_pruned(at, pruned_transactions)
	}
	
	
	
	
	
	
	pub async fn prune(
		&self,
		at: &BlockId<B::Block>,
		parent: &BlockId<B::Block>,
		extrinsics: &[ExtrinsicFor<B>],
	) -> Result<(), B::Error> {
		log::debug!(
			target: "txpool",
			"Starting pruning of block {:?} (extrinsics: {})",
			at,
			extrinsics.len()
		);
		
		let in_pool_hashes = extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
		
		let all = extrinsics.iter().zip(in_pool_tags.into_iter());
		let mut future_tags = Vec::new();
		for (extrinsic, in_pool_tags) in all {
			match in_pool_tags {
				
				Some(tags) => future_tags.extend(tags),
				
				
				None => {
					let validity = self.validated_pool.api()
						.validate_transaction(parent, TransactionSource::InBlock, extrinsic.clone())
						.await;
					if let Ok(Ok(validity)) = validity {
						future_tags.extend(validity.provides);
					}
				},
			}
		}
		self.prune_tags(at, future_tags, in_pool_hashes).await
	}
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	pub async fn prune_tags(
		&self,
		at: &BlockId<B::Block>,
		tags: impl IntoIterator<Item=Tag>,
		known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
	) -> Result<(), B::Error> {
		log::debug!(target: "txpool", "Pruning at {:?}", at);
		
		let prune_status = match self.validated_pool.prune_tags(tags) {
			Ok(prune_status) => prune_status,
			Err(e) => return Err(e),
		};
		
		
		
		self.validated_pool.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
		
		
		let pruned_hashes = prune_status.pruned
			.iter()
			.map(|tx| tx.hash.clone()).collect::<Vec<_>>();
		let pruned_transactions = prune_status.pruned
			.into_iter()
			.map(|tx| (tx.source, tx.data.clone()));
		let reverified_transactions = self.verify(
			at,
			pruned_transactions,
			CheckBannedBeforeVerify::Yes,
		).await?;
		log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at);
		
		self.validated_pool.resubmit_pruned(
			&at,
			known_imported_hashes,
			pruned_hashes,
			reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
		)
	}
	
	pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
		self.validated_pool.api().hash_and_length(xt).0
	}
	
	fn resolve_block_number(&self, at: &BlockId<B::Block>) -> Result<NumberFor<B>, B::Error> {
		self.validated_pool.api().block_id_to_number(at)
			.and_then(|number| number.ok_or_else(||
				error::Error::InvalidBlockId(format!("{:?}", at)).into()))
	}
	
	async fn verify(
		&self,
		at: &BlockId<B::Block>,
		xts: impl IntoIterator<Item=(TransactionSource, ExtrinsicFor<B>)>,
		check: CheckBannedBeforeVerify,
	) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
		
		let block_number = self.resolve_block_number(at)?;
		let res = futures::future::join_all(
			xts.into_iter()
				.map(|(source, xt)| self.verify_one(at, block_number, source, xt, check))
		).await.into_iter().collect::<HashMap<_, _>>();
		Ok(res)
	}
	
	async fn verify_one(
		&self,
		block_id: &BlockId<B::Block>,
		block_number: NumberFor<B>,
		source: TransactionSource,
		xt: ExtrinsicFor<B>,
		check: CheckBannedBeforeVerify,
	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
			return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
		}
		let validation_result = self.validated_pool.api().validate_transaction(
			block_id,
			source,
			xt.clone(),
		).await;
		let status = match validation_result {
			Ok(status) => status,
			Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
		};
		let validity = match status {
			Ok(validity) => {
				if validity.provides.is_empty() {
					ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
				} else {
					ValidatedTransaction::valid_at(
						block_number.saturated_into::<u64>(),
						hash.clone(),
						source,
						xt,
						bytes,
						validity,
					)
				}
			},
			Err(TransactionValidityError::Invalid(e)) =>
				ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
			Err(TransactionValidityError::Unknown(e)) =>
				ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
		};
		(hash, validity)
	}
	
	pub fn validated_pool(&self) ->  &ValidatedPool<B> {
		&self.validated_pool
	}
}
impl<B: ChainApi> Clone for Pool<B> {
	fn clone(&self) -> Self {
		Self {
			validated_pool: self.validated_pool.clone(),
		}
	}
}
#[cfg(test)]
mod tests {
	use std::collections::{HashMap, HashSet};
	use parking_lot::Mutex;
	use futures::executor::block_on;
	use super::*;
	use sp_transaction_pool::TransactionStatus;
	use sp_runtime::{
		traits::Hash,
		transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource},
	};
	use codec::Encode;
	use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId, Hashing};
	use assert_matches::assert_matches;
	use wasm_timer::Instant;
	use crate::base_pool::Limit;
	const INVALID_NONCE: u64 = 254;
	const SOURCE: TransactionSource = TransactionSource::External;
	#[derive(Clone, Debug, Default)]
	struct TestApi {
		delay: Arc<Mutex<Option<std::sync::mpsc::Receiver<()>>>>,
		invalidate: Arc<Mutex<HashSet<H256>>>,
		clear_requirements: Arc<Mutex<HashSet<H256>>>,
		add_requirements: Arc<Mutex<HashSet<H256>>>,
	}
	impl ChainApi for TestApi {
		type Block = Block;
		type Error = error::Error;
		type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
		type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
		
		fn validate_transaction(
			&self,
			at: &BlockId<Self::Block>,
			_source: TransactionSource,
			uxt: ExtrinsicFor<Self>,
		) -> Self::ValidationFuture {
			let hash = self.hash_and_length(&uxt).0;
			let block_number = self.block_id_to_number(at).unwrap().unwrap();
			let res = match uxt {
				Extrinsic::Transfer { transfer, .. } => {
					let nonce = transfer.nonce;
					
					if nonce > 0 {
						let opt = self.delay.lock().take();
						if let Some(delay) = opt {
							if delay.recv().is_err() {
								println!("Error waiting for delay!");
							}
						}
					}
					if self.invalidate.lock().contains(&hash) {
						InvalidTransaction::Custom(0).into()
					} else if nonce < block_number {
						InvalidTransaction::Stale.into()
					} else {
						let mut transaction = ValidTransaction {
							priority: 4,
							requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
							provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
							longevity: 3,
							propagate: true,
						};
						if self.clear_requirements.lock().contains(&hash) {
							transaction.requires.clear();
						}
						if self.add_requirements.lock().contains(&hash) {
							transaction.requires.push(vec![128]);
						}
						Ok(transaction)
					}
				},
				Extrinsic::IncludeData(_) => {
					Ok(ValidTransaction {
						priority: 9001,
						requires: vec![],
						provides: vec![vec![42]],
						longevity: 9001,
						propagate: false,
					})
				},
				_ => unimplemented!(),
			};
			futures::future::ready(Ok(res))
		}
		
		fn block_id_to_number(
			&self,
			at: &BlockId<Self::Block>,
		) -> Result<Option<NumberFor<Self>>, Self::Error> {
			Ok(match at {
				BlockId::Number(num) => Some(*num),
				BlockId::Hash(_) => None,
			})
		}
		
		fn block_id_to_hash(
			&self,
			at: &BlockId<Self::Block>,
		) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error> {
			Ok(match at {
				BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
				BlockId::Hash(_) => None,
			})
		}
		
		fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (BlockHash<Self>, usize) {
			let encoded = uxt.encode();
			let len = encoded.len();
			(Hashing::hash(&encoded), len)
		}
		fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
			futures::future::ready(Ok(None))
		}
	}
	fn uxt(transfer: Transfer) -> Extrinsic {
		Extrinsic::Transfer {
			transfer,
			signature: Default::default(),
			exhaust_resources_when_not_first: false,
		}
	}
	fn pool() -> Pool<TestApi> {
		Pool::new(Default::default(), true.into(), TestApi::default().into())
	}
	#[test]
	fn should_validate_and_import_transaction() {
		
		let pool = pool();
		
		let hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 0,
		}))).unwrap();
		
		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
	}
	#[test]
	fn should_reject_if_temporarily_banned() {
		
		let pool = pool();
		let uxt = uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 0,
		});
		
		pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
		let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
		assert_eq!(pool.validated_pool().status().ready, 0);
		assert_eq!(pool.validated_pool().status().future, 0);
		
		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
	}
	#[test]
	fn should_reject_unactionable_transactions() {
		
		let pool = Pool::new(
			Default::default(),
			
			false.into(),
			TestApi::default().into(),
		);
		
		let uxt = Extrinsic::IncludeData(vec![42]);
		
		let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
		
		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
	}
	#[test]
	fn should_notify_about_pool_events() {
		let (stream, hash0, hash1) = {
			
			let pool = pool();
			let stream = pool.validated_pool().import_notification_stream();
			
			let hash0 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			}))).unwrap();
			let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 1,
			}))).unwrap();
			
			let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 3,
			}))).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 2);
			assert_eq!(pool.validated_pool().status().future, 1);
			(stream, hash0, hash1)
		};
		
		let mut it = futures::executor::block_on_stream(stream);
		assert_eq!(it.next(), Some(hash0));
		assert_eq!(it.next(), Some(hash1));
		assert_eq!(it.next(), None);
	}
	#[test]
	fn should_clear_stale_transactions() {
		
		let pool = pool();
		let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 0,
		}))).unwrap();
		let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 1,
		}))).unwrap();
		let hash3 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 3,
		}))).unwrap();
		
		pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
		
		assert_eq!(pool.validated_pool().ready().count(), 0);
		assert_eq!(pool.validated_pool().status().future, 0);
		assert_eq!(pool.validated_pool().status().ready, 0);
		
		assert!(pool.validated_pool.rotator().is_banned(&hash1));
		assert!(pool.validated_pool.rotator().is_banned(&hash2));
		assert!(pool.validated_pool.rotator().is_banned(&hash3));
	}
	#[test]
	fn should_ban_mined_transactions() {
		
		let pool = pool();
		let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 0,
		}))).unwrap();
		
		block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap();
		
		assert!(pool.validated_pool.rotator().is_banned(&hash1));
	}
	#[test]
	fn should_limit_futures() {
		
		let limit = Limit {
			count: 100,
			total_bytes: 200,
		};
		let options = Options {
			ready: limit.clone(),
			future: limit.clone(),
			..Default::default()
		};
		let pool = Pool::new(options, true.into(), TestApi::default().into());
		let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 1,
		}))).unwrap();
		assert_eq!(pool.validated_pool().status().future, 1);
		
		let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(2)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 10,
		}))).unwrap();
		
		assert_eq!(pool.validated_pool().status().future, 1);
		assert!(pool.validated_pool.rotator().is_banned(&hash1));
		assert!(!pool.validated_pool.rotator().is_banned(&hash2));
	}
	#[test]
	fn should_error_if_reject_immediately() {
		
		let limit = Limit {
			count: 100,
			total_bytes: 10,
		};
		let options = Options {
			ready: limit.clone(),
			future: limit.clone(),
			..Default::default()
		};
		let pool = Pool::new(options, true.into(), TestApi::default().into());
		
		block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: 1,
		}))).unwrap_err();
		
		assert_eq!(pool.validated_pool().status().ready, 0);
		assert_eq!(pool.validated_pool().status().future, 0);
	}
	#[test]
	fn should_reject_transactions_with_no_provides() {
		
		let pool = pool();
		
		let err = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
			from: AccountId::from_h256(H256::from_low_u64_be(1)),
			to: AccountId::from_h256(H256::from_low_u64_be(2)),
			amount: 5,
			nonce: INVALID_NONCE,
		}))).unwrap_err();
		
		assert_eq!(pool.validated_pool().status().ready, 0);
		assert_eq!(pool.validated_pool().status().future, 0);
		assert_matches!(err, error::Error::NoTagsProvided);
	}
	mod listener {
		use super::*;
		#[test]
		fn should_trigger_ready_and_finalized() {
			
			let pool = pool();
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			}))).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			assert_eq!(pool.validated_pool().status().future, 0);
			
			block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 0);
			assert_eq!(pool.validated_pool().status().future, 0);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
			assert_eq!(
				stream.next(),
				Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
			);
		}
		#[test]
		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
			
			let pool = pool();
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			}))).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			assert_eq!(pool.validated_pool().status().future, 0);
			
			block_on(
				pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![watcher.hash().clone()]),
			).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 0);
			assert_eq!(pool.validated_pool().status().future, 0);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
			assert_eq!(
				stream.next(),
				Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
			);
		}
		#[test]
		fn should_trigger_future_and_ready_after_promoted() {
			
			let pool = pool();
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 1,
			}))).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 0);
			assert_eq!(pool.validated_pool().status().future, 1);
			
			block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			}))).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 2);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Future));
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
		}
		#[test]
		fn should_trigger_invalid_and_ban() {
			
			let pool = pool();
			let uxt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			});
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			
			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
			assert_eq!(stream.next(), None);
		}
		#[test]
		fn should_trigger_broadcasted() {
			
			let pool = pool();
			let uxt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			});
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			
			let mut map = HashMap::new();
			let peers = vec!["a".into(), "b".into(), "c".into()];
			map.insert(*watcher.hash(), peers.clone());
			pool.validated_pool().on_broadcasted(map);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
		}
		#[test]
		fn should_trigger_dropped() {
			
			let limit = Limit {
				count: 1,
				total_bytes: 1000,
			};
			let options = Options {
				ready: limit.clone(),
				future: limit.clone(),
				..Default::default()
			};
			let pool = Pool::new(options, true.into(), TestApi::default().into());
			let xt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 0,
			});
			let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			
			let xt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(2)),
				to: AccountId::from_h256(H256::from_low_u64_be(1)),
				amount: 4,
				nonce: 1,
			});
			block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			
			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
		}
		#[test]
		fn should_handle_pruning_in_the_middle_of_import() {
			
			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
			let (tx, rx) = std::sync::mpsc::sync_channel(1);
			let mut api = TestApi::default();
			api.delay = Arc::new(Mutex::new(rx.into()));
			let pool = Arc::new(Pool::new(Default::default(), true.into(), api.into()));
			
			let xt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 5,
				nonce: 1,
			});
			
			let pool2 = pool.clone();
			std::thread::spawn(move || {
				block_on(pool2.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
				ready.send(()).unwrap();
			});
			
			
			let xt = uxt(Transfer {
				from: AccountId::from_h256(H256::from_low_u64_be(1)),
				to: AccountId::from_h256(H256::from_low_u64_be(2)),
				amount: 4,
				nonce: 0,
			});
			
			let provides = vec![0_u8];
			block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 1);
			
			block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap();
			assert_eq!(pool.validated_pool().status().ready, 0);
			
			
			
			tx.send(()).unwrap();
			
			is_ready.recv().unwrap(); 
			assert_eq!(pool.validated_pool().status().ready, 1);
			assert_eq!(pool.validated_pool().status().future, 0);
		}
	}
}