#![allow(unused)] use { alloc::{boxed::Box, collections::BTreeMap, sync::Arc, task::Wake}, core::{ future::Future, pin::Pin, task::{Context, Poll, Waker}, }, crossbeam_queue::SegQueue, kiam::when, slab::Slab, spin::RwLock, }; static SPAWN_QUEUE: RwLock> = RwLock::new(None); pub fn spawn(future: impl Future + Send + 'static) { match &*SPAWN_QUEUE.read() { Some(s) => s.push(Task::new(future)), None => panic!("no task executor is running"), } } pub fn yield_now() -> impl Future { struct YieldNow(bool); impl Future for YieldNow { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.0 { Poll::Ready(()) } else { self.0 = true; cx.waker().wake_by_ref(); Poll::Pending } } } YieldNow(false) } #[derive(Default)] pub struct Executor { tasks: Slab, queue: TaskQueue, to_spawn: SpawnQueue, wakers: BTreeMap, } impl Executor { pub fn spawn(&mut self, future: impl Future + Send + 'static) { self.queue .push(TaskId(self.tasks.insert(Task::new(future)))); } pub fn run(&mut self) { { let mut global_spawner = SPAWN_QUEUE.write(); if global_spawner.is_some() { panic!("Task executor is already running"); } *global_spawner = Some(Arc::clone(&self.to_spawn)); } loop { when! { let Some(id) = self .to_spawn .pop() .map(|t| TaskId(self.tasks.insert(t))) .or_else(|| self.queue.pop()) => { let Some(task) = self.tasks.get_mut(id.0) else { panic!("Attempted to get task from empty slot: {}", id.0); }; let mut cx = Context::from_waker(self.wakers.entry(id).or_insert_with(|| { Waker::from(Arc::new(TaskWaker { id, queue: Arc::clone(&self.queue), })) })); match task.poll(&mut cx) { Poll::Ready(()) => { self.tasks.remove(id.0); self.wakers.remove(&id); } Poll::Pending => (), } }, self.tasks.is_empty() => break, _ => (), } } *SPAWN_QUEUE.write() = None; } } struct Task { future: Pin + Send>>, } impl Task { pub fn new(future: impl Future + Send + 'static) -> Self { log::trace!("New task scheduled"); Self { future: Box::pin(future), } } fn poll(&mut self, cx: &mut Context) -> Poll<()> { self.future.as_mut().poll(cx) } } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] struct TaskId(usize); type TaskQueue = Arc>; type SpawnQueue = Arc>; struct TaskWaker { id: TaskId, queue: TaskQueue, } impl Wake for TaskWaker { fn wake(self: Arc) { log::trace!("Woke Task-{:?}", self.id); self.wake_by_ref(); } fn wake_by_ref(self: &Arc) { self.queue.push(self.id); } }