use core::marker::PhantomData;
use core::ptr::{self, NonNull};
use generic_array::ArrayLength;
use sealed;
use spsc::{MultiCore, Queue};
impl<T, N, U, C> Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, N, U, C>, Consumer<'rb, T, N, U, C>) {
(
Producer {
rb: unsafe { NonNull::new_unchecked(self) },
_marker: PhantomData,
},
Consumer {
rb: unsafe { NonNull::new_unchecked(self) },
_marker: PhantomData,
},
)
}
}
pub struct Consumer<'a, T, N, U = usize, C = MultiCore>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>,
}
unsafe impl<'a, T, N, U, C> Send for Consumer<'a, T, N, U, C>
where
N: ArrayLength<T>,
T: Send,
U: sealed::Uxx,
C: sealed::XCore,
{
}
pub struct Producer<'a, T, N, U = usize, C = MultiCore>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>,
}
unsafe impl<'a, T, N, U> Send for Producer<'a, T, N, U>
where
N: ArrayLength<T>,
T: Send,
U: sealed::Uxx,
{
}
macro_rules! impl_ {
($uxx:ident) => {
impl<'a, T, N, C> Consumer<'a, T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
pub fn ready(&self) -> bool {
let head = unsafe { self.rb.as_ref().head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().tail.load_acquire() };
return head != tail;
}
pub fn dequeue(&mut self) -> Option<T> {
let head = unsafe { self.rb.as_ref().head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().tail.load_acquire() };
if head != tail {
Some(unsafe { self._dequeue(head) })
} else {
None
}
}
pub unsafe fn dequeue_unchecked(&mut self) -> T {
let head = self.rb.as_ref().head.load_relaxed();
debug_assert_ne!(head, self.rb.as_ref().tail.load_acquire());
self._dequeue(head)
}
unsafe fn _dequeue(&mut self, head: $uxx) -> T {
let rb = self.rb.as_ref();
let cap = rb.capacity();
let buffer = rb.buffer.get_ref();
let item = ptr::read(buffer.get_unchecked(usize::from(head % cap)));
rb.head.store_release(head.wrapping_add(1));
item
}
}
impl<'a, T, N, C> Producer<'a, T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
pub fn ready(&self) -> bool {
let cap = unsafe { self.rb.as_ref().capacity() };
let tail = unsafe { self.rb.as_ref().tail.load_relaxed() };
let head = unsafe { self.rb.as_ref().head.load_acquire() };
return head.wrapping_add(cap) != tail;
}
pub fn enqueue(&mut self, item: T) -> Result<(), T> {
let cap = unsafe { self.rb.as_ref().capacity() };
let tail = unsafe { self.rb.as_ref().tail.load_relaxed() };
let head = unsafe { self.rb.as_ref().head.load_acquire() };
if tail.wrapping_sub(head) > cap - 1 {
Err(item)
} else {
unsafe { self._enqueue(tail, item) };
Ok(())
}
}
pub unsafe fn enqueue_unchecked(&mut self, item: T) {
let tail = self.rb.as_ref().tail.load_relaxed();
debug_assert_ne!(tail.wrapping_add(1), self.rb.as_ref().head.load_acquire());
self._enqueue(tail, item);
}
unsafe fn _enqueue(&mut self, tail: $uxx, item: T) {
let rb = self.rb.as_mut();
let cap = rb.capacity();
let buffer = rb.buffer.get_mut();
ptr::write(buffer.get_unchecked_mut(usize::from(tail % cap)), item);
rb.tail.store_release(tail.wrapping_add(1));
}
}
};
}
#[cfg(feature = "smaller-atomics")]
impl_!(u8);
#[cfg(feature = "smaller-atomics")]
impl_!(u16);
impl_!(usize);
#[cfg(test)]
mod tests {
use consts::*;
use spsc::Queue;
#[test]
fn sanity() {
let mut rb: Queue<i32, U2> = Queue::new();
let (mut p, mut c) = rb.split();
assert_eq!(c.dequeue(), None);
p.enqueue(0).unwrap();
assert_eq!(c.dequeue(), Some(0));
}
}