
150 lines
3.6 KiB
Raw Permalink Normal View History

//! Async task and executor
use alloc::{boxed::Box, collections::BTreeMap, sync::Arc, task::Wake};
use core::{
task::{Context, Poll, Waker},
use crossbeam_queue::SegQueue;
use slab::Slab;
2022-08-08 23:32:42 +00:00
use spin::RwLock;
type TaskQueue = Arc<SegQueue<TaskId>>;
2022-08-08 23:32:42 +00:00
type SpawnQueue = Arc<SegQueue<Task>>;
static SPAWN_QUEUE: RwLock<Option<SpawnQueue>> = RwLock::new(None);
/// Spawn a new task
pub fn spawn(future: impl Future<Output = ()> + Send + 'static) {
match &*SPAWN_QUEUE.read() {
Some(s) => s.push(Task::new(future)),
None => panic!("no task executor is running"),
/// Forcibly yield a task
pub fn yield_now() -> impl Future<Output = ()> {
struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 {
} else {
self.0 = true;
/// Tasks executor
pub struct Executor {
/// All spawned tasks' stash
tasks: Slab<Task>,
/// Awake tasks' queue
queue: TaskQueue,
2022-08-08 23:32:42 +00:00
/// Incoming tasks to enqueue
incoming: SpawnQueue,
/// Wakers
wakers: BTreeMap<TaskId, Waker>,
impl Executor {
2022-08-08 23:32:42 +00:00
/// Spawn a task
pub fn spawn(&mut self, future: impl Future<Output = ()> + Send + 'static) {
2022-08-08 23:32:42 +00:00
/// Spin poll loop until it runs out of tasks
pub fn run(&mut self) {
// Assign `self.incoming` to global spawn queue to spawn tasks
// from within
let mut spawner = SPAWN_QUEUE.write();
if spawner.is_some() {
panic!("task executor is already running");
*spawner = Some(Arc::clone(&self.incoming));
// Try to get incoming task, if none available, poll
// enqueued one
while let Some(id) = self
.map(|t| TaskId(self.tasks.insert(t)))
.or_else(|| self.queue.pop())
let Some(task) = self.tasks.get_mut(id.0) else {
panic!("attempted to get non-extant task with id {}", id.0)
2022-08-08 23:32:42 +00:00
2022-11-05 00:43:41 +00:00
let mut cx = Context::from_waker(self.wakers.entry(id).or_insert_with(|| {
Waker::from(Arc::new(TaskWaker {
queue: Arc::clone(&self.queue),
2022-08-08 23:32:42 +00:00
match task.poll(&mut cx) {
Poll::Ready(()) => {
// Task done, unregister
2022-08-08 23:32:42 +00:00
Poll::Pending => (),
2022-08-08 23:32:42 +00:00
*SPAWN_QUEUE.write() = None;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(usize);
/// Async task
struct Task {
2022-08-08 23:32:42 +00:00
future: Pin<Box<dyn Future<Output = ()> + Send>>,
impl Task {
/// Create a new task from a future
2022-08-08 23:32:42 +00:00
fn new(future: impl Future<Output = ()> + Send + 'static) -> Self {
Self {
future: Box::pin(future),
fn poll(&mut self, cx: &mut Context) -> Poll<()> {
struct TaskWaker {
id: TaskId,
queue: TaskQueue,
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
fn wake_by_ref(self: &Arc<Self>) {