Rust定时器
use std::{
collections::{binary_heap::PeekMut, BinaryHeap},
future::Future,
sync::{Arc, Mutex},
thread::{self, Thread},
time::{Duration, Instant},
};
use futures::future::BoxFuture;
use thiserror::Error;
pub trait Swarp: Send + 'static {
fn swarp(self: Box<Self>);
}
impl<F> Swarp for F
where
F: FnOnce() + Send + 'static,
{
fn swarp(self: Box<Self>) {
std::thread::spawn(self);
}
}
pub struct TokioSwarp {
handle: tokio::runtime::Handle,
future: BoxFuture<'static, ()>,
}
impl TokioSwarp {
pub fn new<F>(handle: tokio::runtime::Handle, future: F) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
Self {
handle,
future: Box::pin(future),
}
}
}
impl Swarp for TokioSwarp {
fn swarp(self: Box<Self>) {
self.handle.spawn(self.future);
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Io Error {}",.0)]
Io(
#[source]
#[from]
std::io::Error,
),
#[error("{}",.0)]
Msg(String),
}
struct Pair {
time: Instant,
swarp: Box<dyn Swarp>,
}
impl Eq for Pair {}
impl Ord for Pair {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.time.cmp(&other.time).reverse()
}
}
impl PartialEq for Pair {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl PartialOrd for Pair {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
struct Inner {
heap: BinaryHeap<Pair>,
thread: Option<Thread>,
}
#[derive(Clone)]
pub struct Clock {
inner: Arc<Mutex<Inner>>,
}
impl Clock {
pub fn new() -> Result<Self, Error> {
let this = Self {
inner: Arc::new(Mutex::new(Inner {
heap: BinaryHeap::new(),
thread: None,
})),
};
let inner = this.inner.clone();
let join_handle = thread::Builder::new()
.name("Time Clock Thread".to_string())
.spawn(move || loop {
let mut guard = inner.lock().expect("获取锁异常");
let sleep = 'a: loop {
let now = Instant::now();
if let Some(data) = guard.heap.peek_mut() {
if data.time <= now {
let pair = PeekMut::pop(data);
pair.swarp.swarp();
continue;
}
break 'a data.time.checked_duration_since(now).unwrap_or_default();
} else {
break 'a Duration::from_secs(86400 * 365 * 30);
}
};
drop(guard);
thread::park_timeout(sleep);
})?;
let inner = this.inner.clone();
let mut guard = inner
.lock()
.map_err(|e| Error::Msg(format!("获取锁异常 {:?}", e)))?;
guard.thread = Some(join_handle.thread().clone());
Ok(this)
}
pub fn push<S>(&self, time: Instant, swarp: S) -> Result<(), Error>
where
S: Swarp,
{
let mut guard = self
.inner
.lock()
.map_err(|e| Error::Msg(format!("获取锁异常 {:?}", e)))?;
guard.heap.push(Pair {
time,
swarp: Box::new(swarp),
});
if let Some(thread) = &guard.thread {
thread.unpark();
}
Ok(())
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接