use super::*;
use crate::kbucket::{Key, KeyBytes};
use libp2p_core::PeerId;
use std::{
collections::HashMap,
iter::{Cycle, Map, Peekable},
ops::{Index, IndexMut, Range},
};
use wasm_timer::Instant;
pub struct ClosestDisjointPeersIter {
config: ClosestPeersIterConfig,
target: KeyBytes,
iters: Vec<ClosestPeersIter>,
iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
contacted_peers: HashMap<PeerId, PeerState>,
}
impl ClosestDisjointPeersIter {
pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
{
Self::with_config(
ClosestPeersIterConfig::default(),
target,
known_closest_peers,
)
}
pub fn with_config<I, T>(
config: ClosestPeersIterConfig,
target: T,
known_closest_peers: I,
) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
T: Into<KeyBytes> + Clone,
{
let peers = known_closest_peers.into_iter().take(K_VALUE.get()).collect::<Vec<_>>();
let iters = (0..config.parallelism.get())
.map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
.collect::<Vec<_>>();
let iters_len = iters.len();
ClosestDisjointPeersIter {
config,
target: target.into(),
iters,
iter_order: (0..iters_len).map(IteratorIndex as fn(usize) -> IteratorIndex).cycle(),
contacted_peers: HashMap::new(),
}
}
pub fn on_failure(&mut self, peer: &PeerId) -> bool {
let mut updated = false;
if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) {
updated = self.iters[*initiated_by].on_failure(peer);
if updated {
*response = ResponseState::Failed;
}
for (i, iter) in &mut self.iters.iter_mut().enumerate() {
if IteratorIndex(i) != *initiated_by {
iter.on_failure(peer);
}
}
}
updated
}
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
where
I: IntoIterator<Item = PeerId>,
{
let mut updated = false;
if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) {
updated = self.iters[*initiated_by].on_success(peer, closer_peers);
if updated {
*response = ResponseState::Succeeded;
}
for (i, iter) in &mut self.iters.iter_mut().enumerate() {
if IteratorIndex(i) != *initiated_by {
iter.on_success(peer, std::iter::empty());
}
}
}
updated
}
pub fn is_waiting(&self, peer: &PeerId) -> bool {
self.iters.iter().any(|i| i.is_waiting(peer))
}
pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
let mut state = None;
for _ in 0 .. self.iters.len() {
let i = self.iter_order.next().expect("Cycle never ends.");
let iter = &mut self.iters[i];
loop {
match iter.next(now) {
PeersIterState::Waiting(None) => {
match state {
Some(PeersIterState::Waiting(Some(_))) => {
unreachable!();
},
Some(PeersIterState::Waiting(None)) => {}
Some(PeersIterState::WaitingAtCapacity) => {
state = Some(PeersIterState::Waiting(None))
}
Some(PeersIterState::Finished) => {
unreachable!();
}
None => state = Some(PeersIterState::Waiting(None)),
};
break;
}
PeersIterState::Waiting(Some(peer)) => {
match self.contacted_peers.get_mut(&*peer) {
Some(PeerState{ response, .. }) => {
let peer = peer.into_owned();
match response {
ResponseState::Waiting => {},
ResponseState::Succeeded => {
iter.on_success(&peer, std::iter::empty());
},
ResponseState::Failed => {
iter.on_failure(&peer);
},
}
},
None => {
self.contacted_peers.insert(
peer.clone().into_owned(),
PeerState::new(i),
);
return PeersIterState::Waiting(Some(Cow::Owned(peer.into_owned())));
},
}
}
PeersIterState::WaitingAtCapacity => {
match state {
Some(PeersIterState::Waiting(Some(_))) => {
unreachable!();
},
Some(PeersIterState::Waiting(None)) => {}
Some(PeersIterState::WaitingAtCapacity) => {}
Some(PeersIterState::Finished) => {
unreachable!();
},
None => state = Some(PeersIterState::WaitingAtCapacity),
};
break;
}
PeersIterState::Finished => break,
}
}
}
state.unwrap_or(PeersIterState::Finished)
}
pub fn finish_paths<'a, I>(&mut self, peers: I) -> bool
where
I: IntoIterator<Item = &'a PeerId>
{
for peer in peers {
if let Some(PeerState{ initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
self.iters[*initiated_by].finish();
}
}
self.is_finished()
}
pub fn finish(&mut self) {
for iter in &mut self.iters {
iter.finish();
}
}
pub fn is_finished(&self) -> bool {
self.iters.iter().all(|i| i.is_finished())
}
pub fn into_result(self) -> impl Iterator<Item = PeerId> {
let result_per_path= self.iters.into_iter()
.map(|iter| iter.into_result().map(Key::from));
ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct IteratorIndex(usize);
impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
type Output = ClosestPeersIter;
fn index(&self, index: IteratorIndex) -> &Self::Output {
&self[index.0]
}
}
impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
&mut self[index.0]
}
}
#[derive(Debug, PartialEq, Eq)]
struct PeerState {
initiated_by: IteratorIndex,
response: ResponseState,
}
impl PeerState {
fn new(initiated_by: IteratorIndex) -> Self {
PeerState {
initiated_by,
response: ResponseState::Waiting,
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum ResponseState {
Waiting,
Succeeded,
Failed,
}
#[derive(Clone, Debug)]
struct ResultIter<I> where
I: Iterator<Item = Key<PeerId>>,
{
target: KeyBytes,
iters: Vec<Peekable<I>>,
}
impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
ResultIter{
target,
iters: iters.map(Iterator::peekable).collect(),
}
}
}
impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
let target = &self.target;
self.iters.iter_mut()
.fold(
Option::<&mut Peekable<_>>::None,
|iter_a, iter_b| {
let iter_a = match iter_a {
Some(iter_a) => iter_a,
None => return Some(iter_b),
};
match (iter_a.peek(), iter_b.peek()) {
(Some(next_a), Some(next_b)) => {
if next_a == next_b {
iter_b.next();
return Some(iter_a)
}
if target.distance(next_a) < target.distance(next_b) {
Some(iter_a)
} else {
Some(iter_b)
}
},
(Some(_), None) => Some(iter_a),
(None, Some(_)) => Some(iter_b),
(None, None) => None,
}
},
)
.and_then(Iterator::next)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::K_VALUE;
use libp2p_core::multihash::{Code, Multihash};
use quickcheck::*;
use rand::{Rng, seq::SliceRandom};
use std::collections::HashSet;
use std::iter;
impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let target = Target::arbitrary(g).0;
let num_closest_iters = g.gen_range(0, 20 + 1);
let peers = random_peers(
g.gen_range(0, 20 * num_closest_iters + 1),
g,
);
let iters: Vec<_> = (0..num_closest_iters)
.map(|_| {
let num_peers = g.gen_range(0, 20 + 1);
let mut peers = peers.choose_multiple(g, num_peers)
.cloned()
.map(Key::from)
.collect::<Vec<_>>();
peers.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
peers.into_iter()
})
.collect();
ResultIter::new(target, iters.into_iter())
}
fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
let peers = self.iters
.clone()
.into_iter()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let iters = self.iters.clone()
.into_iter()
.map(|iter| iter.collect::<Vec<_>>())
.collect();
Box::new(ResultIterShrinker {
target: self.target.clone(),
peers,
iters,
})
}
}
struct ResultIterShrinker {
target: KeyBytes,
peers: Vec<Key<PeerId>>,
iters: Vec<Vec<Key<PeerId>>>,
}
impl Iterator for ResultIterShrinker {
type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
fn next(&mut self) -> Option<Self::Item> {
let peer = self.peers.pop()?;
let iters = self.iters.clone().into_iter()
.filter_map(|mut iter| {
iter.retain(|p| p != &peer);
if iter.is_empty() {
return None;
}
Some(iter.into_iter())
}).collect::<Vec<_>>();
Some(ResultIter::new(self.target.clone(), iters.into_iter()))
}
}
#[derive(Clone, Debug)]
struct Target(KeyBytes);
impl Arbitrary for Target {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
Target(Key::from(random_peers(1, g).pop().unwrap()).into())
}
}
fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
(0 .. n).map(|_| PeerId::from_multihash(
Multihash::wrap(Code::Sha2_256.into(), &g.gen::<[u8; 32]>()).unwrap()
).unwrap()).collect()
}
#[test]
fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
let expected = {
let mut deduplicated = result_iter.clone()
.iters
.into_iter()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.map(Key::from)
.collect::<Vec<_>>();
deduplicated.sort_unstable_by(|a, b| {
result_iter.target.distance(a).cmp(&result_iter.target.distance(b))
});
deduplicated
};
assert_eq!(expected, result_iter.collect::<Vec<_>>());
}
QuickCheck::new().quickcheck(prop as fn(_))
}
#[derive(Debug, Clone)]
struct Parallelism(NonZeroUsize);
impl Arbitrary for Parallelism{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
Parallelism(NonZeroUsize::new(g.gen_range(1, 10)).unwrap())
}
}
#[derive(Debug, Clone)]
struct NumResults(NonZeroUsize);
impl Arbitrary for NumResults{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
NumResults(NonZeroUsize::new(g.gen_range(1, K_VALUE.get())).unwrap())
}
}
impl Arbitrary for ClosestPeersIterConfig {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
ClosestPeersIterConfig {
parallelism: Parallelism::arbitrary(g).0,
num_results: NumResults::arbitrary(g).0,
peer_timeout: Duration::from_secs(1),
}
}
}
#[derive(Debug, Clone)]
struct PeerVec(pub Vec<Key<PeerId>>);
impl Arbitrary for PeerVec {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
PeerVec(
(0..g.gen_range(1, 60))
.map(|_| PeerId::random())
.map(Key::from)
.collect(),
)
}
}
#[test]
fn s_kademlia_disjoint_paths() {
let now = Instant::now();
let target: KeyBytes = Key::from(PeerId::random()).into();
let mut pool = [0; 12].iter()
.map(|_| Key::from(PeerId::random()))
.collect::<Vec<_>>();
pool.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
let known_closest_peers = pool.split_off(pool.len() - 3);
let config = ClosestPeersIterConfig {
parallelism: NonZeroUsize::new(3).unwrap(),
num_results: NonZeroUsize::new(3).unwrap(),
..ClosestPeersIterConfig::default()
};
let mut peers_iter = ClosestDisjointPeersIter::with_config(
config.clone(),
target,
known_closest_peers.clone(),
);
for _ in 0..3 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
assert!(known_closest_peers.contains(&Key::from(peer)));
} else {
panic!("Expected iterator to return peer to query.");
}
}
assert_eq!(
PeersIterState::WaitingAtCapacity,
peers_iter.next(now),
);
let response_2 = pool.split_off(pool.len() - 3);
let response_3 = pool.split_off(pool.len() - 3);
let malicious_response_1 = pool.split_off(pool.len() - 3);
peers_iter.on_success(
known_closest_peers[0].preimage(),
malicious_response_1.clone().into_iter().map(|k| k.preimage().clone()),
);
peers_iter.on_success(
known_closest_peers[1].preimage(),
response_2.clone().into_iter().map(|k| k.preimage().clone()),
);
peers_iter.on_success(
known_closest_peers[2].preimage(),
response_3.clone().into_iter().map(|k| k.preimage().clone()),
);
let mut next_to_query = vec![];
for _ in 0..3 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
next_to_query.push(peer)
} else {
panic!("Expected iterator to return peer to query.");
}
};
assert!(next_to_query.contains(malicious_response_1[0].preimage()));
assert!(next_to_query.contains(response_2[0].preimage()));
assert!(next_to_query.contains(response_3[0].preimage()));
for peer in next_to_query {
peers_iter.on_success(&peer, vec![]);
}
for _ in 0..6 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
peers_iter.on_success(&peer, vec![]);
} else {
panic!("Expected iterator to return peer to query.");
}
}
assert_eq!(
PeersIterState::Finished,
peers_iter.next(now),
);
let final_peers: Vec<_> = peers_iter.into_result().collect();
assert!(final_peers.contains(malicious_response_1[0].preimage()));
assert!(final_peers.contains(response_2[0].preimage()));
assert!(final_peers.contains(response_3[0].preimage()));
}
#[derive(Clone)]
struct Graph(HashMap<PeerId, Peer>);
impl std::fmt::Debug for Graph {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fmt.debug_list().entries(self.0.iter().map(|(id, _)| id)).finish()
}
}
impl Arbitrary for Graph {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let mut peer_ids = random_peers(g.gen_range(K_VALUE.get(), 200), g)
.into_iter()
.map(|peer_id| (peer_id.clone(), Key::from(peer_id)))
.collect::<Vec<_>>();
let mut peers = peer_ids.clone().into_iter()
.map(|(peer_id, key)| {
peer_ids.sort_unstable_by(|(_, a), (_, b)| {
key.distance(a).cmp(&key.distance(b))
});
assert_eq!(peer_id, peer_ids[0].0);
let known_peers = peer_ids.iter()
.skip(1)
.take(K_VALUE.get())
.cloned()
.collect::<Vec<_>>();
(peer_id, Peer{ known_peers })
})
.collect::<HashMap<_, _>>();
for (peer_id, peer) in peers.iter_mut() {
peer_ids.shuffle(g);
let num_peers = g.gen_range(K_VALUE.get(), peer_ids.len() + 1);
let mut random_peer_ids = peer_ids.choose_multiple(g, num_peers)
.filter(|(id, _)| peer_id != id)
.cloned()
.collect::<Vec<_>>();
peer.known_peers.append(&mut random_peer_ids);
peer.known_peers = std::mem::replace(&mut peer.known_peers, vec![])
.into_iter().collect::<HashSet<_>>().into_iter().collect();
}
Graph(peers)
}
}
impl Graph {
fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
self.0.iter()
.map(|(peer_id, _)| (target.distance(&Key::from(*peer_id)), peer_id))
.fold(None, |acc, (distance_b, peer_id_b)| {
match acc {
None => Some((distance_b, peer_id_b)),
Some((distance_a, peer_id_a)) => if distance_a < distance_b {
Some((distance_a, peer_id_a))
} else {
Some((distance_b, peer_id_b))
}
}
})
.expect("Graph to have at least one peer.")
.1.clone()
}
}
#[derive(Debug, Clone)]
struct Peer {
known_peers: Vec<(PeerId, Key<PeerId>)>,
}
impl Peer {
fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
self.known_peers.sort_unstable_by(|(_, a), (_, b)| {
target.distance(a).cmp(&target.distance(b))
});
self.known_peers.iter().take(K_VALUE.get()).map(|(id, _)| id).cloned().collect()
}
}
enum PeerIterator {
Disjoint(ClosestDisjointPeersIter),
Closest(ClosestPeersIter),
}
impl PeerIterator {
fn next(&mut self, now: Instant) -> PeersIterState<'_> {
match self {
PeerIterator::Disjoint(iter) => iter.next(now),
PeerIterator::Closest(iter) => iter.next(now),
}
}
fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
match self {
PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
};
}
fn into_result(self) -> Vec<PeerId> {
match self {
PeerIterator::Disjoint(iter) => iter.into_result().collect(),
PeerIterator::Closest(iter) => iter.into_result().collect(),
}
}
}
#[test]
fn closest_and_disjoint_closest_yield_same_result() {
fn prop(
target: Target,
graph: Graph,
parallelism: Parallelism,
num_results: NumResults,
) -> TestResult {
if parallelism.0 > num_results.0 {
return TestResult::discard();
}
let target: KeyBytes = target.0;
let closest_peer = graph.get_closest_peer(&target);
let mut known_closest_peers = graph.0.iter()
.take(K_VALUE.get())
.map(|(key, _peers)| Key::from(*key))
.collect::<Vec<_>>();
known_closest_peers.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
let cfg = ClosestPeersIterConfig{
parallelism: parallelism.0,
num_results: num_results.0,
..ClosestPeersIterConfig::default()
};
let closest = drive_to_finish(
PeerIterator::Closest(ClosestPeersIter::with_config(
cfg.clone(),
target.clone(),
known_closest_peers.clone(),
)),
graph.clone(),
&target,
);
let disjoint = drive_to_finish(
PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
cfg,
target.clone(),
known_closest_peers.clone(),
)),
graph.clone(),
&target,
);
assert!(
closest.contains(&closest_peer),
"Expected `ClosestPeersIter` to find closest peer.",
);
assert!(
disjoint.contains(&closest_peer),
"Expected `ClosestDisjointPeersIter` to find closest peer.",
);
assert!(
closest.len() == num_results.0.get(),
"Expected `ClosestPeersIter` to find `num_results` closest \
peers."
);
assert!(
disjoint.len() >= num_results.0.get(),
"Expected `ClosestDisjointPeersIter` to find at least \
`num_results` closest peers."
);
if closest.len() > disjoint.len() {
let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
panic!(
"Expected `ClosestDisjointPeersIter` to find all peers \
found by `ClosestPeersIter`, but it did not find {:?}.",
closest_only,
);
};
TestResult::passed()
}
fn drive_to_finish(
mut iter: PeerIterator,
mut graph: Graph,
target: &KeyBytes,
) -> HashSet<PeerId> {
let now = Instant::now();
loop {
match iter.next(now) {
PeersIterState::Waiting(Some(peer_id)) => {
let peer_id = peer_id.clone().into_owned();
let closest_peers = graph.0.get_mut(&peer_id)
.unwrap()
.get_closest_peers(&target);
iter.on_success(&peer_id, closest_peers);
} ,
PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) =>
panic!("There is never more than one request in flight."),
PeersIterState::Finished => break,
}
}
let mut result = iter.into_result().into_iter().map(Key::from).collect::<Vec<_>>();
result.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
result.into_iter().map(|k| k.into_preimage()).collect()
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _, _, _) -> _)
}
#[test]
fn failure_can_not_overwrite_previous_success() {
let now = Instant::now();
let peer = PeerId::random();
let mut iter = ClosestDisjointPeersIter::new(
Key::from(PeerId::random()).into(),
iter::once(Key::from(peer.clone())),
);
assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
assert!(iter.on_success(&peer, iter::empty()));
assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState {
initiated_by: IteratorIndex(0),
response: ResponseState::Succeeded,
}));
assert!(!iter.on_failure(&peer));
assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState {
initiated_by: IteratorIndex(0),
response: ResponseState::Succeeded,
}));
}
}