环形缓冲区 Circular Buffer
环形缓冲区 Circular Buffer 又称为 Ring Buffer, Cyclic Buffer 或者 Circular Queue.
环形缓冲区是线性数据结构, 通常由数组来实现, 如下图所示:
将尾部与头部相连, 组成一个环形索引, 逻辑上的关系如下图所示.:
所以才称为环形缓冲区.
环形缓冲区实现的是单生产者-单消费者模式 (single-producer, single-consumer), 生产者将元素加到尾部, 然后消费者从头部读取元素, FIFO (first in first out).
与链表相比, 这种数据结构更加紧凑, 空间利用率高, 对CPU的缓存友好, 常用作 I/O buffering.
环形缓冲区的基本操作
TODO(Shaohua):
初始化缓冲区
因为缓冲区的容量是事先确定的, 在初始化它的同时, 可以分配好相应的堆内存.
如果分配内存失败, 就直接产生 panic
异常.
函数签名是:
pub fn new(capacity: usize) -> Self
向缓冲区中加入元素
函数签名是:
pub fn push(&mut self, value: T) -> Result<(), T>
生产者调用它, 加入元素时, 如果缓冲区已经满了, 就直接返回 Err(value)
. 为了简化实现, 我们并没有定义相应的
错误类型.
从缓冲区中读取元素
消费者调用它, 每次读取一个元素.
函数签名是: pub fn pop(&mut self) -> Option<T>
如果缓冲区已经空了, 就返回 None
环形缓冲区的实现
考虑到性能, 下面的 CircularBuffer
使用了几个 unsafe
接口, 要特别留意指针的操作.
#![allow(unused)] fn main() { use std::alloc::{alloc, dealloc, Layout}; use std::marker::PhantomData; use std::ptr::NonNull; use std::{mem, ops, ptr, slice}; pub struct CircularBuffer<T: Sized> { start: usize, len: usize, cap: usize, ptr: NonNull<T>, _marker: PhantomData<T>, } impl<T: Sized> CircularBuffer<T> { /// # Panics /// /// 分配内存失败时直接返回 panic #[must_use] #[inline] pub fn new(capacity: usize) -> Self { // 为了方便处理, 我们强制要求 capacity 是正数, 并且目前还没有考虑 ZST (zero sized type). assert!(capacity > 0); let layout = Layout::array::<T>(capacity).expect("Layout error"); let ptr = unsafe { alloc(layout) }; let ptr = NonNull::new(ptr).expect("Failed to alloc"); Self { start: 0, len: 0, cap: capacity, ptr: ptr.cast(), _marker: PhantomData, } } #[must_use] #[inline] pub const fn as_mut_ptr(&self) -> *mut T { self.ptr.as_ptr() } #[must_use] #[inline] pub const fn as_ptr(&self) -> *const T { self.ptr.as_ptr() } #[must_use] #[inline] pub fn as_slice(&self) -> &[T] { self } #[must_use] #[inline] pub fn as_mut_slice(&mut self) -> &mut [T] { self } /// # Errors /// /// 当缓冲区已满时返回 `Err(value)` pub fn push(&mut self, value: T) -> Result<(), T> { if self.is_full() { Err(value) } else { unsafe { // 计算新元素的指针位置 let end = (self.start + self.len) % self.cap; let end_ptr = self.as_mut_ptr().add(end); self.len += 1; ptr::write(end_ptr, value); } Ok(()) } } /// 从缓冲区消费元素, 如果缓冲区已空, 就返回 `None` pub fn pop(&mut self) -> Option<T> { if self.is_empty() { None } else { unsafe { // 计算起始元素的地址 let start_ptr = self.as_ptr().add(self.start); self.start = (self.start + 1) % self.cap; self.len -= 1; Some(ptr::read(start_ptr)) } } } /// 返回当前缓冲区中的元素个数 #[must_use] #[inline] pub const fn len(&self) -> usize { self.len } #[must_use] #[inline] pub const fn capacity(&self) -> usize { self.cap } #[must_use] #[inline] pub const fn is_empty(&self) -> bool { self.len() == 0 } #[must_use] #[inline] pub const fn is_full(&self) -> bool { self.len() == self.cap } // 计算当前的内存结构 fn current_memory(&self) -> (NonNull<u8>, Layout) { assert_eq!(mem::size_of::<T>() % mem::align_of::<T>(), 0); unsafe { let align = mem::align_of::<T>(); let size = mem::size_of::<T>().unchecked_mul(self.cap); let layout = Layout::from_size_align_unchecked(size, align); (self.ptr.cast(), layout) } } } /// 释放堆内存 impl<T> Drop for CircularBuffer<T> { fn drop(&mut self) { let (ptr, layout) = self.current_memory(); unsafe { dealloc(ptr.as_ptr(), layout) } } } /// 实现 `Deref` 和 `DerefMut` traits. impl<T> ops::Deref for CircularBuffer<T> { type Target = [T]; #[inline] fn deref(&self) -> &[T] { unsafe { slice::from_raw_parts(self.as_ptr(), self.len) } } } impl<T> ops::DerefMut for CircularBuffer<T> { #[inline] fn deref_mut(&mut self) -> &mut [T] { unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } } } /// 支持从迭代器初始化. impl<T> FromIterator<T> for CircularBuffer<T> { fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self { // 为了实现简单, 我们重用了 vec 的 `FromIterator` 实现. let vec: Vec<T> = iter.into_iter().collect(); let len = vec.len(); let cap = vec.capacity(); let boxed = vec.into_boxed_slice(); let ptr = Box::leak(boxed); let ptr = NonNull::new(ptr.as_mut_ptr()).unwrap(); Self { start: 0, len, cap, ptr, _marker: PhantomData, } } } }
环形缓冲区的应用
有不少软件有使用它来管理缓冲区, 下面就列举几个.
比如, 在 linux 内核的网络栈, 接收到对方发送的数据包后, 就先放到对应的环形缓冲区, 并且根据它剩下的空间大小, 来通知发送方调整滑动窗口的大小.