Rust定时器

use std::{
    collections::{binary_heap::PeekMut, BinaryHeap},
    future::Future,
    sync::{Arc, Mutex},
    thread::{self, Thread},
    time::{Duration, Instant},
};

use futures::future::BoxFuture;
use thiserror::Error;

pub trait Swarp: Send + 'static {
    fn swarp(self: Box<Self>);
}
impl<F> Swarp for F
where
    F: FnOnce() + Send + 'static,
{
    fn swarp(self: Box<Self>) {
        std::thread::spawn(self);
    }
}
pub struct TokioSwarp {
    handle: tokio::runtime::Handle,
    future: BoxFuture<'static, ()>,
}
impl TokioSwarp {
    pub fn new<F>(handle: tokio::runtime::Handle, future: F) -> Self
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Self {
            handle,
            future: Box::pin(future),
        }
    }
}
impl Swarp for TokioSwarp {
    fn swarp(self: Box<Self>) {
        self.handle.spawn(self.future);
    }
}
#[derive(Debug, Error)]
pub enum Error {
    #[error("Io Error {}",.0)]
    Io(
        #[source]
        #[from]
        std::io::Error,
    ),
    #[error("{}",.0)]
    Msg(String),
}
struct Pair {
    time: Instant,
    swarp: Box<dyn Swarp>,
}
impl Eq for Pair {}
impl Ord for Pair {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.time.cmp(&other.time).reverse()
    }
}
impl PartialEq for Pair {
    fn eq(&self, other: &Self) -> bool {
        self.time == other.time
    }
}
impl PartialOrd for Pair {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}
struct Inner {
    heap: BinaryHeap<Pair>,
    thread: Option<Thread>,
}
#[derive(Clone)]
pub struct Clock {
    inner: Arc<Mutex<Inner>>,
}
impl Clock {
    pub fn new() -> Result<Self, Error> {
        let this = Self {
            inner: Arc::new(Mutex::new(Inner {
                heap: BinaryHeap::new(),
                thread: None,
            })),
        };
        let inner = this.inner.clone();
        let join_handle = thread::Builder::new()
            .name("Time Clock Thread".to_string())
            .spawn(move || loop {
                let mut guard = inner.lock().expect("获取锁异常");
                let sleep = 'a: loop {
                    let now = Instant::now();
                    if let Some(data) = guard.heap.peek_mut() {
                        if data.time <= now {
                            let pair = PeekMut::pop(data);
                            pair.swarp.swarp();
                            continue;
                        }
                        break 'a data.time.checked_duration_since(now).unwrap_or_default();
                    } else {
                        break 'a Duration::from_secs(86400 * 365 * 30);
                    }
                };

                drop(guard);
                thread::park_timeout(sleep);
            })?;
        let inner = this.inner.clone();
        let mut guard = inner
            .lock()
            .map_err(|e| Error::Msg(format!("获取锁异常 {:?}", e)))?;
        guard.thread = Some(join_handle.thread().clone());
        Ok(this)
    }

    pub fn push<S>(&self, time: Instant, swarp: S) -> Result<(), Error>
    where
        S: Swarp,
    {
        let mut guard = self
            .inner
            .lock()
            .map_err(|e| Error::Msg(format!("获取锁异常 {:?}", e)))?;
        guard.heap.push(Pair {
            time,
            swarp: Box::new(swarp),
        });
        if let Some(thread) = &guard.thread {
            thread.unpark();
        }
        Ok(())
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
謎麟
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!