blob: d0ebf5d4350041f89ac13d058cbd6aa02f1df20e [file] [log] [blame]
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::Inject;
use crate::runtime::thread_pool::queue;
use crate::runtime::MetricsBatch;
use loom::thread;
#[test]
fn basic() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
let mut metrics = MetricsBatch::new();
let th = thread::spawn(move || {
let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
if steal.steal_into(&mut local, &mut metrics).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
}
n
});
let mut n = 0;
for _ in 0..2 {
for _ in 0..2 {
let (task, _) = super::unowned(async {});
local.push_back(task, &inject, &mut metrics);
}
if local.pop().is_some() {
n += 1;
}
// Push another task
let (task, _) = super::unowned(async {});
local.push_back(task, &inject, &mut metrics);
while local.pop().is_some() {
n += 1;
}
}
while inject.pop().is_some() {
n += 1;
}
n += th.join().unwrap();
assert_eq!(6, n);
});
}
#[test]
fn steal_overflow() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
let mut metrics = MetricsBatch::new();
let th = thread::spawn(move || {
let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
if steal.steal_into(&mut local, &mut metrics).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
n
});
let mut n = 0;
// push a task, pop a task
let (task, _) = super::unowned(async {});
local.push_back(task, &inject, &mut metrics);
if local.pop().is_some() {
n += 1;
}
for _ in 0..6 {
let (task, _) = super::unowned(async {});
local.push_back(task, &inject, &mut metrics);
}
n += th.join().unwrap();
while local.pop().is_some() {
n += 1;
}
while inject.pop().is_some() {
n += 1;
}
assert_eq!(7, n);
});
}
#[test]
fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
if steal.steal_into(&mut local, &mut metrics).is_none() {
return 0;
}
let mut n = 1;
while local.pop().is_some() {
n += 1;
}
n
}
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
let mut metrics = MetricsBatch::new();
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = super::unowned(async {});
local.push_back(task, &inject, &mut metrics);
}
let th1 = {
let steal = steal.clone();
thread::spawn(move || steal_tasks(steal))
};
let th2 = thread::spawn(move || steal_tasks(steal));
let mut n = 0;
while local.pop().is_some() {
n += 1;
}
while inject.pop().is_some() {
n += 1;
}
n += th1.join().unwrap();
n += th2.join().unwrap();
assert_eq!(n, NUM_TASKS);
});
}
#[test]
fn chained_steal() {
loom::model(|| {
let mut metrics = MetricsBatch::new();
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
let inject = Inject::new();
// Load up some tasks
for _ in 0..4 {
let (task, _) = super::unowned(async {});
l1.push_back(task, &inject, &mut metrics);
let (task, _) = super::unowned(async {});
l2.push_back(task, &inject, &mut metrics);
}
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
s1.steal_into(&mut local, &mut metrics);
while local.pop().is_some() {}
});
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
s2.steal_into(&mut l1, &mut metrics);
th.join().unwrap();
while l1.pop().is_some() {}
while l2.pop().is_some() {}
while inject.pop().is_some() {}
});
}