#![warn(missing_docs)]
use futures::{channel::mpsc, prelude::*};
use libp2p::Multiaddr;
use log::{error, warn};
use serde::Serialize;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use std::collections::HashMap;
use tracing::Id;
pub use libp2p::wasm_ext::ExtTransport;
pub use serde_json;
pub use tracing;
mod endpoints;
mod layer;
mod node;
mod transport;
pub use endpoints::*;
pub use layer::*;
use node::*;
use transport::*;
pub const SUBSTRATE_DEBUG: u8 = 9;
pub const SUBSTRATE_INFO: u8 = 0;
pub const CONSENSUS_TRACE: u8 = 9;
pub const CONSENSUS_DEBUG: u8 = 5;
pub const CONSENSUS_WARN: u8 = 4;
pub const CONSENSUS_INFO: u8 = 1;
pub(crate) type TelemetryMessage = (Id, u8, String);
#[derive(Debug, Clone)]
pub struct TelemetrySpan(tracing::Span);
impl TelemetrySpan {
	
	pub fn enter(&self) -> tracing::span::Entered {
		self.0.enter()
	}
	
	pub fn new() -> Self {
		Self(tracing::error_span!(TELEMETRY_LOG_SPAN))
	}
	
	pub fn span(&self) -> tracing::Span {
		self.0.clone()
	}
}
#[derive(Debug, Serialize)]
pub struct ConnectionMessage {
	
	pub name: String,
	
	pub implementation: String,
	
	pub version: String,
	
	pub config: String,
	
	pub chain: String,
	
	pub genesis_hash: String,
	
	pub authority: bool,
	
	pub startup_time: String,
	
	pub network_id: String,
}
#[derive(Debug)]
pub struct TelemetryWorker {
	message_receiver: mpsc::Receiver<TelemetryMessage>,
	message_sender: mpsc::Sender<TelemetryMessage>,
	register_receiver: mpsc::UnboundedReceiver<Register>,
	register_sender: mpsc::UnboundedSender<Register>,
	transport: WsTrans,
}
impl TelemetryWorker {
	pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
		let (register_sender, register_receiver) = mpsc::unbounded();
		Self {
			message_receiver,
			message_sender,
			register_receiver,
			register_sender,
			transport,
		}
	}
	
	
	
	pub fn handle(&self) -> TelemetryHandle {
		TelemetryHandle {
			message_sender: self.register_sender.clone(),
		}
	}
	
	pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
		self.message_sender.clone()
	}
	
	
	
	pub async fn run(self) {
		let Self {
			mut message_receiver,
			message_sender: _,
			mut register_receiver,
			register_sender: _,
			transport,
		} = self;
		let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
		loop {
			futures::select! {
				message = message_receiver.next() => Self::process_message(
					message,
					&mut node_pool,
					&node_map,
				).await,
				init_payload = register_receiver.next() => Self::process_register(
					init_payload,
					&mut node_pool,
					&mut node_map,
					transport.clone(),
				).await,
			}
		}
	}
	async fn process_register(
		input: Option<Register>,
		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
		node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
		transport: WsTrans,
	) {
		let input = input.expect("the stream is never closed; qed");
		match input {
			Register::Telemetry {
				id,
				endpoints,
				connection_message,
			} => {
				let endpoints = endpoints.0;
				let connection_message = match serde_json::to_value(&connection_message) {
					Ok(serde_json::Value::Object(mut value)) => {
						value.insert("msg".into(), "system.connected".into());
						let mut obj = serde_json::Map::new();
						obj.insert("id".to_string(), id.into_u64().into());
						obj.insert("payload".to_string(), value.into());
						Some(obj)
					}
					Ok(_) => {
						unreachable!("ConnectionMessage always serialize to an object; qed")
					}
					Err(err) => {
						log::error!(
							target: "telemetry",
							"Could not serialize connection message: {}",
							err,
						);
						None
					}
				};
				for (addr, verbosity) in endpoints {
					log::trace!(
						target: "telemetry",
						"Initializing telemetry for: {:?}",
						addr,
					);
					node_map
						.entry(id.clone())
						.or_default()
						.push((verbosity, addr.clone()));
					let node = node_pool.entry(addr.clone()).or_insert_with(|| {
						Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
					});
					node.connection_messages.extend(connection_message.clone());
				}
			}
			Register::Notifier {
				addresses,
				connection_notifier,
			} => {
				for addr in addresses {
					if let Some(node) = node_pool.get_mut(&addr) {
						node.telemetry_connection_notifier
							.push(connection_notifier.clone());
					} else {
						log::error!(
							target: "telemetry",
							"Received connection notifier for unknown node ({}). This is a bug.",
							addr,
						);
					}
				}
			}
		}
	}
	
	async fn process_message(
		input: Option<TelemetryMessage>,
		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
		node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
	) {
		let (id, verbosity, message) = input.expect("the stream is never closed; qed");
		let nodes = if let Some(nodes) = node_map.get(&id) {
			nodes
		} else {
			
			
			
			log::trace!(
				target: "telemetry",
				"Received telemetry log for unknown id ({:?}): {}",
				id,
				message,
			);
			return;
		};
		for (node_max_verbosity, addr) in nodes {
			if verbosity > *node_max_verbosity {
				log::trace!(
					target: "telemetry",
					"Skipping {} for log entry with verbosity {:?}",
					addr,
					verbosity,
				);
				continue;
			}
			if let Some(node) = node_pool.get_mut(&addr) {
				let _ = node.send(message.clone()).await;
			} else {
				log::error!(
					target: "telemetry",
					"Received message for unknown node ({}). This is a bug. \
					Message sent: {}",
					addr,
					message,
				);
			}
		}
	}
}
#[derive(Debug, Clone)]
pub struct TelemetryHandle {
	message_sender: mpsc::UnboundedSender<Register>,
}
impl TelemetryHandle {
	
	
	
	
	
	
	
	
	
	
	pub fn start_telemetry(
		&mut self,
		span: TelemetrySpan,
		endpoints: TelemetryEndpoints,
		connection_message: ConnectionMessage,
	) -> TelemetryConnectionNotifier {
		let Self { message_sender } = self;
		let connection_notifier = TelemetryConnectionNotifier {
			message_sender: message_sender.clone(),
			addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
		};
		match span.0.id() {
			Some(id) => {
				match message_sender.unbounded_send(Register::Telemetry {
					id,
					endpoints,
					connection_message,
				}) {
					Ok(()) => {}
					Err(err) => error!(
						target: "telemetry",
						"Could not initialize telemetry: \
						the telemetry is probably already running: {}",
						err,
					),
				}
			}
			None => error!(
				target: "telemetry",
				"Could not initialize telemetry: the span could not be entered",
			),
		}
		connection_notifier
	}
}
#[derive(Clone, Debug)]
pub struct TelemetryConnectionNotifier {
	message_sender: mpsc::UnboundedSender<Register>,
	addresses: Vec<Multiaddr>,
}
impl TelemetryConnectionNotifier {
	
	
	
	
	pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
		let (message_sender, message_receiver) = tracing_unbounded("mpsc_telemetry_on_connect");
		if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
			addresses: self.addresses.clone(),
			connection_notifier: message_sender,
		}) {
			error!(
				target: "telemetry",
				"Could not create a telemetry connection notifier: \
				the telemetry is probably already running: {}",
				err,
			);
		}
		message_receiver
	}
}
#[derive(Debug)]
enum Register {
	Telemetry {
		id: Id,
		endpoints: TelemetryEndpoints,
		connection_message: ConnectionMessage,
	},
	Notifier {
		addresses: Vec<Multiaddr>,
		connection_notifier: ConnectionNotifierSender,
	},
}
#[macro_export(local_inner_macros)]
macro_rules! telemetry {
	( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
		let verbosity: u8 = $verbosity;
		match format_fields_to_json!($($t)*) {
			Err(err) => {
				$crate::tracing::error!(
					target: "telemetry",
					"Could not serialize value for telemetry: {}",
					err,
				);
			},
			Ok(mut json) => {
				
				json.insert("msg".into(), $msg.into());
				let serialized_json = $crate::serde_json::to_string(&json)
					.expect("contains only string keys; qed");
				$crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
					verbosity,
					json = serialized_json.as_str(),
				);
			},
		}
	}};
}
#[macro_export(local_inner_macros)]
#[doc(hidden)]
macro_rules! format_fields_to_json {
	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
		$crate::serde_json::to_value(&$v)
			.map(|value| {
				let mut map = $crate::serde_json::Map::new();
				map.insert($k.into(), value);
				map
			})
			$(
				.and_then(|mut prev_map| {
					format_fields_to_json!($($t)*)
						.map(move |mut other_map| {
							prev_map.append(&mut other_map);
							prev_map
						})
				})
			)*
	}};
	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
		let mut map = $crate::serde_json::Map::new();
		map.insert($k.into(), std::format!("{:?}", &$v).into());
		$crate::serde_json::Result::Ok(map)
		$(
			.and_then(|mut prev_map| {
				format_fields_to_json!($($t)*)
					.map(move |mut other_map| {
						prev_map.append(&mut other_map);
						prev_map
					})
			})
		)*
	}};
}