use std::{pin::Pin, time::Duration, marker::PhantomData};
use futures::{prelude::*, task::Context, task::Poll};
use futures_timer::Delay;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded, TracingUnboundedReceiver};
use prometheus_endpoint::Registry;
use crate::{
	block_import::BlockOrigin,
	import_queue::{
		BlockImportResult, BlockImportError, Verifier, BoxBlockImport,
		BoxJustificationImport, ImportQueue, Link, Origin,
		IncomingBlock, import_single_block_metered,
		buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver},
	},
	metrics::Metrics,
};
pub struct BasicQueue<B: BlockT, Transaction> {
	
	justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
	
	block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
	
	result_port: BufferedLinkReceiver<B>,
	_phantom: PhantomData<Transaction>,
}
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
	fn drop(&mut self) {
		
		self.justification_sender.close_channel();
		self.block_import_sender.close_channel();
		self.result_port.close();
	}
}
impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
	
	
	
	pub fn new<V: 'static + Verifier<B>>(
		verifier: V,
		block_import: BoxBlockImport<B, Transaction>,
		justification_import: Option<BoxJustificationImport<B>>,
		spawner: &impl sp_core::traits::SpawnNamed,
		prometheus_registry: Option<&Registry>,
	) -> Self {
		let (result_sender, result_port) = buffered_link::buffered_link();
		let metrics = prometheus_registry.and_then(|r| {
			Metrics::register(r)
				.map_err(|err| {
					log::warn!("Failed to register Prometheus metrics: {}", err);
				})
				.ok()
		});
		let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
			result_sender,
			verifier,
			block_import,
			justification_import,
			metrics,
		);
		spawner.spawn_blocking("basic-block-import-worker", future.boxed());
		Self {
			justification_sender,
			block_import_sender,
			result_port,
			_phantom: PhantomData,
		}
	}
}
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
		if blocks.is_empty() {
			return;
		}
		trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
		let res =
			self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
		if res.is_err() {
			log::error!(
				target: "sync",
				"import_blocks: Background import task is no longer alive"
			);
		}
	}
	fn import_justification(
		&mut self,
		who: Origin,
		hash: B::Hash,
		number: NumberFor<B>,
		justification: Justification,
	) {
		let res = self.justification_sender.unbounded_send(
			worker_messages::ImportJustification(who, hash, number, justification),
		);
		if res.is_err() {
			log::error!(
				target: "sync",
				"import_justification: Background import task is no longer alive"
			);
		}
	}
	fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
		if self.result_port.poll_actions(cx, link).is_err() {
			log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
		}
	}
}
mod worker_messages {
	use super::*;
	pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
	pub struct ImportJustification<B: BlockT>(pub Origin, pub B::Hash, pub NumberFor<B>, pub Justification);
}
async fn block_import_process<B: BlockT, Transaction: Send>(
	mut block_import: BoxBlockImport<B, Transaction>,
	mut verifier: impl Verifier<B>,
	mut result_sender: BufferedLinkSender<B>,
	mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
	metrics: Option<Metrics>,
	delay_between_blocks: Duration,
) {
	loop {
		let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await {
			Some(blocks) => blocks,
			None => return,
		};
		let res = import_many_blocks(
			&mut block_import,
			origin,
			blocks,
			&mut verifier,
			delay_between_blocks,
			metrics.clone(),
		).await;
		result_sender.blocks_processed(res.imported, res.block_count, res.results);
	}
}
struct BlockImportWorker<B: BlockT> {
	result_sender: BufferedLinkSender<B>,
	justification_import: Option<BoxJustificationImport<B>>,
	metrics: Option<Metrics>,
}
impl<B: BlockT> BlockImportWorker<B> {
	fn new<V: 'static + Verifier<B>, Transaction: Send>(
		result_sender: BufferedLinkSender<B>,
		verifier: V,
		block_import: BoxBlockImport<B, Transaction>,
		justification_import: Option<BoxJustificationImport<B>>,
		metrics: Option<Metrics>,
	) -> (
		impl Future<Output = ()> + Send,
		TracingUnboundedSender<worker_messages::ImportJustification<B>>,
		TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
	) {
		use worker_messages::*;
		let (justification_sender, mut justification_port) =
			tracing_unbounded("mpsc_import_queue_worker_justification");
		let (block_import_sender, block_import_port) =
			tracing_unbounded("mpsc_import_queue_worker_blocks");
		let mut worker = BlockImportWorker {
			result_sender,
			justification_import,
			metrics,
		};
		
		if let Some(justification_import) = worker.justification_import.as_mut() {
			for (hash, number) in justification_import.on_start() {
				worker.result_sender.request_justification(&hash, number);
			}
		}
		let delay_between_blocks = Duration::default();
		let future = async move {
			let block_import_process = block_import_process(
				block_import,
				verifier,
				worker.result_sender.clone(),
				block_import_port,
				worker.metrics.clone(),
				delay_between_blocks,
			);
			futures::pin_mut!(block_import_process);
			loop {
				
				
				if worker.result_sender.is_closed() {
					return;
				}
				
				while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
					match justification {
						Some(ImportJustification(who, hash, number, justification)) =>
							worker.import_justification(who, hash, number, justification),
						None => return,
					}
				}
				if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
					return;
				}
				
				futures::pending!()
			}
		};
		(future, justification_sender, block_import_sender)
	}
	fn import_justification(
		&mut self,
		who: Origin,
		hash: B::Hash,
		number: NumberFor<B>,
		justification: Justification
	) {
		let started = wasm_timer::Instant::now();
		let success = self.justification_import.as_mut().map(|justification_import| {
			justification_import.import_justification(hash, number, justification)
				.map_err(|e| {
					debug!(
						target: "sync",
						"Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}",
						e,
						hash,
						number,
						who,
					);
					e
				}).is_ok()
		}).unwrap_or(false);
		if let Some(metrics) = self.metrics.as_ref() {
			metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
		}
		self.result_sender.justification_imported(who, &hash, number, success);
	}
}
struct ImportManyBlocksResult<B: BlockT> {
	
	imported: usize,
	
	block_count: usize,
	
	results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
}
async fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
	import_handle: &mut BoxBlockImport<B, Transaction>,
	blocks_origin: BlockOrigin,
	blocks: Vec<IncomingBlock<B>>,
	verifier: &mut V,
	delay_between_blocks: Duration,
	metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
	let count = blocks.len();
	let blocks_range = match (
		blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
		blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
	) {
		(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
		(Some(first), Some(_)) => format!(" ({})", first),
		_ => Default::default(),
	};
	trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range);
	let mut imported = 0;
	let mut results = vec![];
	let mut has_error = false;
	let mut blocks = blocks.into_iter();
	
	loop {
		
		let block = match blocks.next() {
			Some(b) => b,
			None => {
				
				return ImportManyBlocksResult { block_count: count, imported, results }
			},
		};
		let block_number = block.header.as_ref().map(|h| h.number().clone());
		let block_hash = block.hash;
		let import_result = if has_error {
			Err(BlockImportError::Cancelled)
		} else {
			
			import_single_block_metered(
				import_handle,
				blocks_origin.clone(),
				block,
				verifier,
				metrics.clone(),
			)
		};
		if let Some(metrics) = metrics.as_ref() {
			metrics.report_import::<B>(&import_result);
		}
		if import_result.is_ok() {
			trace!(
				target: "sync",
				"Block imported successfully {:?} ({})",
				block_number,
				block_hash,
			);
			imported += 1;
		} else {
			has_error = true;
		}
		results.push((import_result, block_hash));
		if delay_between_blocks != Duration::default() && !has_error {
			Delay::new(delay_between_blocks).await;
		} else {
			Yield::new().await
		}
	}
}
struct Yield(bool);
impl Yield {
	fn new() -> Self {
		Self(false)
	}
}
impl Future for Yield {
	type Output = ();
	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
		if !self.0 {
			self.0 = true;
			cx.waker().wake_by_ref();
			Poll::Pending
		} else {
			Poll::Ready(())
		}
	}
}
#[cfg(test)]
mod tests {
	use super::*;
	use crate::{
		import_queue::{CacheKeyId, Verifier},
		BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
	};
	use futures::{executor::block_on, Future};
	use sp_test_primitives::{Block, BlockNumber, Extrinsic, Hash, Header};
	use std::collections::HashMap;
	impl Verifier<Block> for () {
		fn verify(
			&mut self,
			origin: BlockOrigin,
			header: Header,
			_justification: Option<Justification>,
			_body: Option<Vec<Extrinsic>>,
		) -> Result<(BlockImportParams<Block, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
			Ok((BlockImportParams::new(origin, header), None))
		}
	}
	impl BlockImport<Block> for () {
		type Error = crate::Error;
		type Transaction = Extrinsic;
		fn check_block(
			&mut self,
			_block: BlockCheckParams<Block>,
		) -> Result<ImportResult, Self::Error> {
			Ok(ImportResult::imported(false))
		}
		fn import_block(
			&mut self,
			_block: BlockImportParams<Block, Self::Transaction>,
			_cache: HashMap<CacheKeyId, Vec<u8>>,
		) -> Result<ImportResult, Self::Error> {
			Ok(ImportResult::imported(true))
		}
	}
	impl JustificationImport<Block> for () {
		type Error = crate::Error;
		fn import_justification(
			&mut self,
			_hash: Hash,
			_number: BlockNumber,
			_justification: Justification,
		) -> Result<(), Self::Error> {
			Ok(())
		}
	}
	#[derive(Debug, PartialEq)]
	enum Event {
		JustificationImported(Hash),
		BlockImported(Hash),
	}
	#[derive(Default)]
	struct TestLink {
		events: Vec<Event>,
	}
	impl Link<Block> for TestLink {
		fn blocks_processed(
			&mut self,
			_imported: usize,
			_count: usize,
			results: Vec<(Result<BlockImportResult<BlockNumber>, BlockImportError>, Hash)>,
		) {
			if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
				self.events.push(Event::BlockImported(hash));
			}
		}
		fn justification_imported(
			&mut self,
			_who: Origin,
			hash: &Hash,
			_number: BlockNumber,
			_success: bool,
		) {
			self.events.push(Event::JustificationImported(hash.clone()))
		}
	}
	#[test]
	fn prioritizes_finality_work_over_block_import() {
		let (result_sender, mut result_port) = buffered_link::buffered_link();
		let (worker, mut finality_sender, mut block_import_sender) =
			BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
		futures::pin_mut!(worker);
		let mut import_block = |n| {
			let header = Header {
				parent_hash: Hash::random(),
				number: n,
				extrinsics_root: Hash::random(),
				state_root: Default::default(),
				digest: Default::default(),
			};
			let hash = header.hash();
			block_on(block_import_sender.send(worker_messages::ImportBlocks(
				BlockOrigin::Own,
				vec![IncomingBlock {
					hash,
					header: Some(header),
					body: None,
					justification: None,
					origin: None,
					allow_missing_state: false,
					import_existing: false,
				}],
			)))
			.unwrap();
			hash
		};
		let mut import_justification = || {
			let hash = Hash::random();
			block_on(finality_sender.send(worker_messages::ImportJustification(
				libp2p::PeerId::random(),
				hash,
				1,
				Vec::new(),
			)))
			.unwrap();
			hash
		};
		let mut link = TestLink::default();
		
		let block1 = import_block(1);
		let block2 = import_block(2);
		let block3 = import_block(3);
		let justification1 = import_justification();
		let justification2 = import_justification();
		let block4 = import_block(4);
		let block5 = import_block(5);
		let block6 = import_block(6);
		let justification3 = import_justification();
		
		block_on(futures::future::poll_fn(|cx| {
			while link.events.len() < 9 {
				match Future::poll(Pin::new(&mut worker), cx) {
					Poll::Pending => {}
					Poll::Ready(()) => panic!("import queue worker should not conclude."),
				}
				result_port.poll_actions(cx, &mut link).unwrap();
			}
			Poll::Ready(())
		}));
		
		assert_eq!(
			link.events,
			vec![
				Event::JustificationImported(justification1),
				Event::JustificationImported(justification2),
				Event::JustificationImported(justification3),
				Event::BlockImported(block1),
				Event::BlockImported(block2),
				Event::BlockImported(block3),
				Event::BlockImported(block4),
				Event::BlockImported(block5),
				Event::BlockImported(block6),
			]
		);
	}
}