Added spawner

This commit is contained in:
Erin 2022-08-09 01:32:42 +02:00 committed by ondra05
parent 55b494c154
commit f2bd9ddf4d
3 changed files with 79 additions and 23 deletions

1
Cargo.lock generated
View file

@ -318,6 +318,7 @@ dependencies = [
"linked_list_allocator", "linked_list_allocator",
"log", "log",
"slab", "slab",
"spin 0.9.4",
"versioning", "versioning",
] ]

View file

@ -7,6 +7,7 @@ version = "0.1.2"
linked_list_allocator = "0.9" linked_list_allocator = "0.9"
log = "0.4.14" log = "0.4.14"
slab = { version = "0.4", default-features = false } slab = { version = "0.4", default-features = false }
spin = "0.9"
[dependencies.crossbeam-queue] [dependencies.crossbeam-queue]
version = "0.3" version = "0.3"

View file

@ -8,8 +8,40 @@ use core::{
}; };
use crossbeam_queue::SegQueue; use crossbeam_queue::SegQueue;
use slab::Slab; use slab::Slab;
use spin::RwLock;
type TaskQueue = Arc<SegQueue<TaskId>>; type TaskQueue = Arc<SegQueue<TaskId>>;
type SpawnQueue = Arc<SegQueue<Task>>;
static SPAWN_QUEUE: RwLock<Option<SpawnQueue>> = RwLock::new(None);
/// Spawn a new task
pub fn spawn(future: impl Future<Output = ()> + Send + 'static) {
match &*SPAWN_QUEUE.read() {
Some(s) => s.push(Task::new(future)),
None => panic!("no task executor is running"),
}
}
/// Forcibly yield a task
pub fn yield_now() -> impl Future<Output = ()> {
struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
YieldNow(false)
}
/// Tasks executor /// Tasks executor
#[derive(Default)] #[derive(Default)]
@ -20,41 +52,63 @@ pub struct Executor {
/// Awake tasks' queue /// Awake tasks' queue
queue: TaskQueue, queue: TaskQueue,
/// Incoming tasks to enqueue
incoming: SpawnQueue,
/// Wakers /// Wakers
wakers: BTreeMap<TaskId, Waker>, wakers: BTreeMap<TaskId, Waker>,
} }
impl Executor { impl Executor {
/// Spawn a future /// Spawn a task
pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) { pub fn spawn(&mut self, future: impl Future<Output = ()> + Send + 'static) {
self.queue self.queue
.push(TaskId(self.tasks.insert(Task::new(future)))); .push(TaskId(self.tasks.insert(Task::new(future))));
} }
/// Run tasks /// Spin poll loop until it runs out of tasks
pub fn run(&mut self) -> ! { pub fn run(&mut self) {
loop { // Assign `self.incoming` to global spawn queue to spawn tasks
while let Some(id) = self.queue.pop() { // from within
let task = match self.tasks.get_mut(id.0) { {
Some(t) => t, let mut spawner = SPAWN_QUEUE.write();
None => continue, if spawner.is_some() {
}; panic!("task executor is already running");
}
let mut cx = Context::from_waker( *spawner = Some(Arc::clone(&self.incoming));
self.wakers }
.entry(id)
.or_insert_with(|| TaskWaker::new(id, Arc::clone(&self.queue))),
);
match task.poll(&mut cx) { // Try to get incoming task, if none available, poll
Poll::Ready(()) => { // enqueued one
self.tasks.remove(id.0); while let Some(id) = self
self.wakers.remove(&id); .incoming
} .pop()
Poll::Pending => (), .map(|t| TaskId(self.tasks.insert(t)))
.or_else(|| self.queue.pop())
{
let task = match self.tasks.get_mut(id.0) {
Some(t) => t,
None => panic!("attempted to get non-extant task with id {}", id.0),
};
let mut cx = Context::from_waker(
self.wakers
.entry(id)
.or_insert_with(|| TaskWaker::new(id, Arc::clone(&self.queue))),
);
match task.poll(&mut cx) {
Poll::Ready(()) => {
// Task done, unregister
self.tasks.remove(id.0);
self.wakers.remove(&id);
} }
Poll::Pending => (),
} }
} }
*SPAWN_QUEUE.write() = None;
} }
} }
@ -63,12 +117,12 @@ struct TaskId(usize);
/// Async task /// Async task
struct Task { struct Task {
future: Pin<Box<dyn Future<Output = ()>>>, future: Pin<Box<dyn Future<Output = ()> + Send>>,
} }
impl Task { impl Task {
/// Create a new task from a future /// Create a new task from a future
fn new(future: impl Future<Output = ()> + 'static) -> Self { fn new(future: impl Future<Output = ()> + Send + 'static) -> Self {
Self { Self {
future: Box::pin(future), future: Box::pin(future),
} }