use std::collections::BTreeMap;
use std::cmp::Reverse;
use sp_database::{Database, Transaction};
use sp_runtime::traits::AtLeast32Bit;
use codec::{Encode, Decode};
use sp_blockchain::{Error, Result};
type DbHash = [u8; 32];
#[derive(Debug, Clone, PartialEq, Eq)]
struct LeafSetItem<H, N> {
	hash: H,
	number: Reverse<N>,
}
#[must_use = "Displaced items from the leaf set must be handled."]
pub struct ImportDisplaced<H, N> {
	new_hash: H,
	displaced: LeafSetItem<H, N>,
}
#[must_use = "Displaced items from the leaf set must be handled."]
pub struct FinalizationDisplaced<H, N> {
	leaves: BTreeMap<Reverse<N>, Vec<H>>,
}
impl<H, N: Ord> FinalizationDisplaced<H, N> {
	
	
	pub fn merge(&mut self, mut other: Self) {
		
		
		
		self.leaves.append(&mut other.leaves);
	}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeafSet<H, N> {
	storage: BTreeMap<Reverse<N>, Vec<H>>,
	pending_added: Vec<(H, N)>,
	pending_removed: Vec<H>,
}
impl<H, N> LeafSet<H, N> where
	H: Clone + PartialEq + Decode + Encode,
	N: std::fmt::Debug + Clone + AtLeast32Bit + Decode + Encode,
{
	
	pub fn new() -> Self {
		Self {
			storage: BTreeMap::new(),
			pending_added: Vec::new(),
			pending_removed: Vec::new(),
		}
	}
	
	pub fn read_from_db(db: &dyn Database<DbHash>, column: u32, prefix: &[u8]) -> Result<Self> {
		let mut storage = BTreeMap::new();
		match db.get(column, prefix) {
			Some(leaves) => {
				let vals: Vec<_> = match Decode::decode(&mut leaves.as_ref()) {
					Ok(vals) => vals,
					Err(_) => return Err(Error::Backend("Error decoding leaves".into())),
				};
				for (number, hashes) in vals.into_iter() {
					storage.insert(Reverse(number), hashes);
				}
			}
			None => {},
		}
		Ok(Self {
			storage,
			pending_added: Vec::new(),
			pending_removed: Vec::new(),
		})
	}
	
	pub fn import(&mut self, hash: H, number: N, parent_hash: H) -> Option<ImportDisplaced<H, N>> {
		
		let displaced = if number != N::zero() {
			let new_number = Reverse(number.clone() - N::one());
			let was_displaced = self.remove_leaf(&new_number, &parent_hash);
			if was_displaced {
				self.pending_removed.push(parent_hash.clone());
				Some(ImportDisplaced {
					new_hash: hash.clone(),
					displaced: LeafSetItem {
						hash: parent_hash,
						number: new_number,
					},
				})
			} else {
				None
			}
		} else {
			None
		};
		self.insert_leaf(Reverse(number.clone()), hash.clone());
		self.pending_added.push((hash, number));
		displaced
	}
	
	
	
	
	
	
	pub fn finalize_height(&mut self, number: N) -> FinalizationDisplaced<H, N> {
		let boundary = if number == N::zero() {
			return FinalizationDisplaced { leaves: BTreeMap::new() };
		} else {
			number - N::one()
		};
		let below_boundary = self.storage.split_off(&Reverse(boundary));
		self.pending_removed.extend(below_boundary.values().flat_map(|h| h.iter()).cloned());
		FinalizationDisplaced {
			leaves: below_boundary,
		}
	}
	
	
	
	
	
	
	pub fn undo(&mut self) -> Undo<H, N> {
		Undo { inner: self }
	}
	
	
	pub fn revert(&mut self, best_hash: H, best_number: N) {
		let items = self.storage.iter()
			.flat_map(|(number, hashes)| hashes.iter().map(move |h| (h.clone(), number.clone())))
			.collect::<Vec<_>>();
		for (hash, number) in &items {
			if number.0 > best_number {
				assert!(
					self.remove_leaf(number, hash),
					"item comes from an iterator over storage; qed",
				);
				self.pending_removed.push(hash.clone());
			}
		}
		let best_number = Reverse(best_number);
		let leaves_contains_best = self.storage
			.get(&best_number)
			.map_or(false, |hashes| hashes.contains(&best_hash));
		
		
		if !leaves_contains_best {
			self.insert_leaf(best_number.clone(), best_hash.clone());
			self.pending_added.push((best_hash, best_number.0));
		}
	}
	
	
	pub fn hashes(&self) -> Vec<H> {
		self.storage.iter().flat_map(|(_, hashes)| hashes.iter()).cloned().collect()
	}
	
	pub fn count(&self) -> usize {
		self.storage.len()
	}
	
	pub fn prepare_transaction(&mut self, tx: &mut Transaction<DbHash>, column: u32, prefix: &[u8]) {
		let leaves: Vec<_> = self.storage.iter().map(|(n, h)| (n.0.clone(), h.clone())).collect();
		tx.set_from_vec(column, prefix, leaves.encode());
		self.pending_added.clear();
		self.pending_removed.clear();
	}
	#[cfg(test)]
	fn contains(&self, number: N, hash: H) -> bool {
		self.storage.get(&Reverse(number)).map_or(false, |hashes| hashes.contains(&hash))
	}
	fn insert_leaf(&mut self, number: Reverse<N>, hash: H) {
		self.storage.entry(number).or_insert_with(Vec::new).push(hash);
	}
	
	fn remove_leaf(&mut self, number: &Reverse<N>, hash: &H) -> bool {
		let mut empty = false;
		let removed = self.storage.get_mut(number).map_or(false, |leaves| {
			let mut found = false;
			leaves.retain(|h| if h == hash {
				found = true;
				false
			} else {
				true
			});
			if leaves.is_empty() { empty = true }
			found
		});
		if removed && empty {
			self.storage.remove(number);
		}
		removed
	}
}
pub struct Undo<'a, H: 'a, N: 'a> {
	inner: &'a mut LeafSet<H, N>,
}
impl<'a, H: 'a, N: 'a> Undo<'a, H, N> where
	H: Clone + PartialEq + Decode + Encode,
	N: std::fmt::Debug + Clone + AtLeast32Bit + Decode + Encode,
{
	
	pub fn undo_import(&mut self, displaced: ImportDisplaced<H, N>) {
		let new_number = Reverse(displaced.displaced.number.0.clone() + N::one());
		self.inner.remove_leaf(&new_number, &displaced.new_hash);
		self.inner.insert_leaf(new_number, displaced.displaced.hash);
	}
	
	pub fn undo_finalization(&mut self, mut displaced: FinalizationDisplaced<H, N>) {
		self.inner.storage.append(&mut displaced.leaves);
	}
}
impl<'a, H: 'a, N: 'a> Drop for Undo<'a, H, N> {
	fn drop(&mut self) {
		self.inner.pending_added.clear();
		self.inner.pending_removed.clear();
	}
}
#[cfg(test)]
mod tests {
	use super::*;
	use std::sync::Arc;
	#[test]
	fn it_works() {
		let mut set = LeafSet::new();
		set.import(0u32, 0u32, 0u32);
		set.import(1_1, 1, 0);
		set.import(2_1, 2, 1_1);
		set.import(3_1, 3, 2_1);
		assert!(set.contains(3, 3_1));
		assert!(!set.contains(2, 2_1));
		assert!(!set.contains(1, 1_1));
		assert!(!set.contains(0, 0));
		set.import(2_2, 2, 1_1);
		assert!(set.contains(3, 3_1));
		assert!(set.contains(2, 2_2));
	}
	#[test]
	fn flush_to_disk() {
		const PREFIX: &[u8] = b"abcdefg";
		let db = Arc::new(sp_database::MemDb::default());
		let mut set = LeafSet::new();
		set.import(0u32, 0u32, 0u32);
		set.import(1_1, 1, 0);
		set.import(2_1, 2, 1_1);
		set.import(3_1, 3, 2_1);
		let mut tx = Transaction::new();
		set.prepare_transaction(&mut tx, 0, PREFIX);
		db.commit(tx).unwrap();
		let set2 = LeafSet::read_from_db(&*db, 0, PREFIX).unwrap();
		assert_eq!(set, set2);
	}
	#[test]
	fn two_leaves_same_height_can_be_included() {
		let mut set = LeafSet::new();
		set.import(1_1u32, 10u32,0u32);
		set.import(1_2, 10, 0);
		assert!(set.storage.contains_key(&Reverse(10)));
		assert!(set.contains(10, 1_1));
		assert!(set.contains(10, 1_2));
		assert!(!set.contains(10, 1_3));
	}
	#[test]
	fn finalization_consistent_with_disk() {
		const PREFIX: &[u8] = b"prefix";
		let db = Arc::new(sp_database::MemDb::default());
		let mut set = LeafSet::new();
		set.import(10_1u32, 10u32, 0u32);
		set.import(11_1, 11, 10_2);
		set.import(11_2, 11, 10_2);
		set.import(12_1, 12, 11_123);
		assert!(set.contains(10, 10_1));
		let mut tx = Transaction::new();
		set.prepare_transaction(&mut tx, 0, PREFIX);
		db.commit(tx).unwrap();
		let _ = set.finalize_height(11);
		let mut tx = Transaction::new();
		set.prepare_transaction(&mut tx, 0, PREFIX);
		db.commit(tx).unwrap();
		assert!(set.contains(11, 11_1));
		assert!(set.contains(11, 11_2));
		assert!(set.contains(12, 12_1));
		assert!(!set.contains(10, 10_1));
		let set2 = LeafSet::read_from_db(&*db, 0, PREFIX).unwrap();
		assert_eq!(set, set2);
	}
	#[test]
	fn undo_finalization() {
		let mut set = LeafSet::new();
		set.import(10_1u32, 10u32, 0u32);
		set.import(11_1, 11, 10_2);
		set.import(11_2, 11, 10_2);
		set.import(12_1, 12, 11_123);
		let displaced = set.finalize_height(11);
		assert!(!set.contains(10, 10_1));
		set.undo().undo_finalization(displaced);
		assert!(set.contains(10, 10_1));
	}
}