环形缓冲区 Circular Buffer

环形缓冲区 Circular Buffer 又称为 Ring Buffer, Cyclic Buffer 或者 Circular Queue.

环形缓冲区是线性数据结构, 通常由数组来实现, 如下图所示:

circular buffer linear

将尾部与头部相连, 组成一个环形索引, 逻辑上的关系如下图所示.:

circular buffer circular

所以才称为环形缓冲区.

环形缓冲区实现的是单生产者-单消费者模式 (single-producer, single-consumer), 生产者将元素加到尾部, 然后消费者从头部读取元素, FIFO (first in first out).

与链表相比, 这种数据结构更加紧凑, 空间利用率高, 对CPU的缓存友好, 常用作 I/O buffering.

环形缓冲区的基本操作

TODO(Shaohua):

初始化缓冲区

因为缓冲区的容量是事先确定的, 在初始化它的同时, 可以分配好相应的堆内存. 如果分配内存失败, 就直接产生 panic 异常.

函数签名是: pub fn new(capacity: usize) -> Self

向缓冲区中加入元素

函数签名是: pub fn push(&mut self, value: T) -> Result<(), T>

生产者调用它, 加入元素时, 如果缓冲区已经满了, 就直接返回 Err(value). 为了简化实现, 我们并没有定义相应的 错误类型.

从缓冲区中读取元素

消费者调用它, 每次读取一个元素.

函数签名是: pub fn pop(&mut self) -> Option<T>

如果缓冲区已经空了, 就返回 None

环形缓冲区的实现

考虑到性能, 下面的 CircularBuffer 使用了几个 unsafe 接口, 要特别留意指针的操作.

#![allow(unused)]
fn main() {
use std::alloc::{alloc, dealloc, Layout};
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{mem, ops, ptr, slice};

pub struct CircularBuffer<T: Sized> {
    start: usize,
    len: usize,
    cap: usize,
    ptr: NonNull<T>,
    _marker: PhantomData<T>,
}

impl<T: Sized> CircularBuffer<T> {
    /// # Panics
    ///
    /// 分配内存失败时直接返回 panic
    #[must_use]
    #[inline]
    pub fn new(capacity: usize) -> Self {
        // 为了方便处理, 我们强制要求 capacity 是正数, 并且目前还没有考虑 ZST (zero sized type).
        assert!(capacity > 0);

        let layout = Layout::array::<T>(capacity).expect("Layout error");
        let ptr = unsafe { alloc(layout) };
        let ptr = NonNull::new(ptr).expect("Failed to alloc");

        Self {
            start: 0,
            len: 0,
            cap: capacity,
            ptr: ptr.cast(),
            _marker: PhantomData,
        }
    }

    #[must_use]
    #[inline]
    pub const fn as_mut_ptr(&self) -> *mut T {
        self.ptr.as_ptr()
    }

    #[must_use]
    #[inline]
    pub const fn as_ptr(&self) -> *const T {
        self.ptr.as_ptr()
    }

    #[must_use]
    #[inline]
    pub fn as_slice(&self) -> &[T] {
        self
    }

    #[must_use]
    #[inline]
    pub fn as_mut_slice(&mut self) -> &mut [T] {
        self
    }

    /// # Errors
    ///
    /// 当缓冲区已满时返回 `Err(value)`
    pub fn push(&mut self, value: T) -> Result<(), T> {
        if self.is_full() {
            Err(value)
        } else {
            unsafe {
                // 计算新元素的指针位置
                let end = (self.start + self.len) % self.cap;
                let end_ptr = self.as_mut_ptr().add(end);
                self.len += 1;
                ptr::write(end_ptr, value);
            }
            Ok(())
        }
    }

    /// 从缓冲区消费元素, 如果缓冲区已空, 就返回 `None`
    pub fn pop(&mut self) -> Option<T> {
        if self.is_empty() {
            None
        } else {
            unsafe {
                // 计算起始元素的地址
                let start_ptr = self.as_ptr().add(self.start);
                self.start = (self.start + 1) % self.cap;
                self.len -= 1;
                Some(ptr::read(start_ptr))
            }
        }
    }

    /// 返回当前缓冲区中的元素个数
    #[must_use]
    #[inline]
    pub const fn len(&self) -> usize {
        self.len
    }

    #[must_use]
    #[inline]
    pub const fn capacity(&self) -> usize {
        self.cap
    }

    #[must_use]
    #[inline]
    pub const fn is_empty(&self) -> bool {
        self.len() == 0
    }

    #[must_use]
    #[inline]
    pub const fn is_full(&self) -> bool {
        self.len() == self.cap
    }

    // 计算当前的内存结构
    fn current_memory(&self) -> (NonNull<u8>, Layout) {
        assert_eq!(mem::size_of::<T>() % mem::align_of::<T>(), 0);
        unsafe {
            let align = mem::align_of::<T>();
            let size = mem::size_of::<T>().unchecked_mul(self.cap);
            let layout = Layout::from_size_align_unchecked(size, align);
            (self.ptr.cast(), layout)
        }
    }
}

/// 释放堆内存
impl<T> Drop for CircularBuffer<T> {
    fn drop(&mut self) {
        let (ptr, layout) = self.current_memory();
        unsafe { dealloc(ptr.as_ptr(), layout) }
    }
}

/// 实现 `Deref` 和 `DerefMut` traits.
impl<T> ops::Deref for CircularBuffer<T> {
    type Target = [T];

    #[inline]
    fn deref(&self) -> &[T] {
        unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
    }
}

impl<T> ops::DerefMut for CircularBuffer<T> {
    #[inline]
    fn deref_mut(&mut self) -> &mut [T] {
        unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
    }
}

/// 支持从迭代器初始化.
impl<T> FromIterator<T> for CircularBuffer<T> {
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
        // 为了实现简单, 我们重用了 vec 的 `FromIterator` 实现.
        let vec: Vec<T> = iter.into_iter().collect();
        let len = vec.len();
        let cap = vec.capacity();
        let boxed = vec.into_boxed_slice();
        let ptr = Box::leak(boxed);
        let ptr = NonNull::new(ptr.as_mut_ptr()).unwrap();
        Self {
            start: 0,
            len,
            cap,
            ptr,
            _marker: PhantomData,
        }
    }
}
}

环形缓冲区的应用

有不少软件有使用它来管理缓冲区, 下面就列举几个.

比如, 在 linux 内核的网络栈, 接收到对方发送的数据包后, 就先放到对应的环形缓冲区, 并且根据它剩下的空间大小, 来通知发送方调整滑动窗口的大小.

参考