use std::collections::VecDeque;
use std::io;
use std::sync::Arc;
use std::task::{Context, Poll};
use cid::Version;
use codec::Encode;
use core::pin::Pin;
use futures::Future;
use futures::io::{AsyncRead, AsyncWrite};
use libp2p::core::{
connection::ConnectionId, Multiaddr, PeerId,
upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo,
};
use libp2p::swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ProtocolsHandler, IntoProtocolsHandler, OneShotHandler,
};
use log::{error, debug, trace};
use prost::Message;
use sp_runtime::traits::{Block as BlockT};
use unsigned_varint::{encode as varint_encode};
use crate::chain::Client;
use crate::schema::bitswap::{
Message as BitswapMessage,
message::{wantlist::WantType, Block as MessageBlock, BlockPresenceType, BlockPresence},
};
const LOG_TARGET: &str = "bitswap";
const MAX_PACKET_SIZE: usize = 16 * 1024 * 1024;
const MAX_RESPONSE_QUEUE: usize = 20;
const MAX_WANTED_BLOCKS: usize = 16;
const PROTOCOL_NAME: &'static [u8] = b"/ipfs/bitswap/1.2.0";
type FutureResult<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
#[derive(Clone, Copy, Debug, Default)]
pub struct BitswapConfig;
impl UpgradeInfo for BitswapConfig {
type Info = &'static [u8];
type InfoIter = std::iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
std::iter::once(PROTOCOL_NAME)
}
}
impl<TSocket> InboundUpgrade<TSocket> for BitswapConfig
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = BitswapMessage;
type Error = BitswapError;
type Future = FutureResult<Self::Output, Self::Error>;
fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future {
Box::pin(async move {
let packet = upgrade::read_one(&mut socket, MAX_PACKET_SIZE).await?;
let message: BitswapMessage = Message::decode(packet.as_slice())?;
Ok(message)
})
}
}
impl UpgradeInfo for BitswapMessage {
type Info = &'static [u8];
type InfoIter = std::iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
std::iter::once(PROTOCOL_NAME)
}
}
impl<TSocket> OutboundUpgrade<TSocket> for BitswapMessage
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = ();
type Error = io::Error;
type Future = FutureResult<Self::Output, Self::Error>;
fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future {
Box::pin(async move {
let mut data = Vec::with_capacity(self.encoded_len());
self.encode(&mut data)?;
upgrade::write_one(&mut socket, data).await
})
}
}
#[derive(Debug)]
pub enum HandlerEvent {
Request(BitswapMessage),
ResponseSent,
}
impl From<BitswapMessage> for HandlerEvent {
fn from(message: BitswapMessage) -> Self {
Self::Request(message)
}
}
impl From<()> for HandlerEvent {
fn from(_: ()) -> Self {
Self::ResponseSent
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
struct Prefix {
pub version: Version,
pub codec: u64,
pub mh_type: u64,
pub mh_len: u8,
}
impl Prefix {
pub fn to_bytes(&self) -> Vec<u8> {
let mut res = Vec::with_capacity(4);
let mut buf = varint_encode::u64_buffer();
let version = varint_encode::u64(self.version.into(), &mut buf);
res.extend_from_slice(version);
let mut buf = varint_encode::u64_buffer();
let codec = varint_encode::u64(self.codec.into(), &mut buf);
res.extend_from_slice(codec);
let mut buf = varint_encode::u64_buffer();
let mh_type = varint_encode::u64(self.mh_type.into(), &mut buf);
res.extend_from_slice(mh_type);
let mut buf = varint_encode::u64_buffer();
let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
res.extend_from_slice(mh_len);
res
}
}
pub struct Bitswap<B> {
client: Arc<dyn Client<B>>,
ready_blocks: VecDeque<(PeerId, BitswapMessage)>,
}
impl<B: BlockT> Bitswap<B> {
pub fn new(client: Arc<dyn Client<B>>) -> Self {
Bitswap {
client,
ready_blocks: Default::default(),
}
}
}
impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
type ProtocolsHandler = OneShotHandler<BitswapConfig, BitswapMessage, HandlerEvent>;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
Default::default()
}
fn addresses_of_peer(&mut self, _peer: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _peer: &PeerId) {
}
fn inject_disconnected(&mut self, _peer: &PeerId) {
}
fn inject_event(&mut self, peer: PeerId, _connection: ConnectionId, message: HandlerEvent) {
let request = match message {
HandlerEvent::ResponseSent => return,
HandlerEvent::Request(msg) => msg,
};
trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
if self.ready_blocks.len() > MAX_RESPONSE_QUEUE {
debug!(target: LOG_TARGET, "Ignored request: queue is full");
return;
}
let mut response = BitswapMessage {
wantlist: None,
blocks: Default::default(),
payload: Default::default(),
block_presences: Default::default(),
pending_bytes: 0,
};
let wantlist = match request.wantlist {
Some(wantlist) => wantlist,
None => {
debug!(
target: LOG_TARGET,
"Unexpected bitswap message from {}",
peer,
);
return;
}
};
if wantlist.entries.len() > MAX_WANTED_BLOCKS {
trace!(target: LOG_TARGET, "Ignored request: too many entries");
return;
}
for entry in wantlist.entries {
let cid = match cid::Cid::read_bytes(entry.block.as_slice()) {
Ok(cid) => cid,
Err(e) => {
trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
continue;
}
};
if cid.version() != cid::Version::V1
|| cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256)
|| cid.hash().size() != 32
{
debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
continue
}
let mut hash = B::Hash::default();
hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
let extrinsic = match self.client.extrinsic(&hash) {
Ok(ex) => ex,
Err(e) => {
error!(target: LOG_TARGET, "Error retrieving extrinsic {}: {}", hash, e);
None
}
};
match extrinsic {
Some(extrinsic) => {
trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
if entry.want_type == WantType::Block as i32 {
let prefix = Prefix {
version: cid.version(),
codec: cid.codec(),
mh_type: cid.hash().code(),
mh_len: cid.hash().size(),
};
response.payload.push(MessageBlock {
prefix: prefix.to_bytes(),
data: extrinsic.encode(),
});
} else {
response.block_presences.push(BlockPresence {
r#type: BlockPresenceType::Have as i32,
cid: cid.to_bytes(),
});
}
},
None => {
trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
if entry.send_dont_have {
response.block_presences.push(BlockPresence {
r#type: BlockPresenceType::DontHave as i32,
cid: cid.to_bytes(),
});
}
}
}
}
trace!(target: LOG_TARGET, "Response: {:?}", response);
self.ready_blocks.push_back((peer, response));
}
fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some((peer_id, message)) = self.ready_blocks.pop_front() {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: message,
})
}
Poll::Pending
}
}
#[derive(derive_more::Display, derive_more::From)]
pub enum BitswapError {
#[display(fmt = "Failed to decode request: {}.", _0)]
DecodeProto(prost::DecodeError),
#[display(fmt = "Failed to encode response: {}.", _0)]
EncodeProto(prost::EncodeError),
Client(sp_blockchain::Error),
BadCid(cid::Error),
Read(upgrade::ReadOneError),
#[display(fmt = "Failed to send response.")]
SendResponse,
}