位图 BitSet

BitSet 又称为 bit map, bit array, bit mask 或者 bit vector, 是一个数组结构, 里面只存储单个的比特, 每一个比特可以表示两个状态. 它是一种很简单的集合数据结构.

位图的基本操作

位图的集合操作

位图的实现

#![allow(unused)]
fn main() {
use std::ops::Index;

const BITS_PER_ELEM: usize = 8;
const TRUE: bool = true;
const FALSE: bool = false;

#[derive(Debug, Clone)]
pub struct BitSet {
    bits: Vec<u8>,
}

impl Default for BitSet {
    fn default() -> Self {
        Self::new()
    }
}

impl BitSet {
    #[must_use]
    #[inline]
    pub const fn new() -> Self {
        Self { bits: Vec::new() }
    }

    #[must_use]
    #[inline]
    pub fn with_len(len: usize) -> Self {
        let bits_len = len.div_ceil(BITS_PER_ELEM);
        Self {
            bits: vec![0; bits_len],
        }
    }

    #[must_use]
    #[inline]
    pub fn from_bytes(bytes: &[u8]) -> Self {
        Self {
            bits: bytes.to_vec(),
        }
    }

    #[must_use]
    #[inline]
    pub fn as_bytes(&self) -> &[u8] {
        &self.bits
    }

    #[must_use]
    #[inline]
    pub fn into_vec(self) -> Vec<u8> {
        self.bits
    }

    fn expand(&mut self, index: usize) {
        let bits_len = (index + 1).div_ceil(BITS_PER_ELEM);
        if self.bits.len() < bits_len {
            // TODO(Shaohua): Adjust resize policy.
            self.bits.resize(bits_len, 0);
        }
    }

    pub fn set(&mut self, index: usize) {
        self.expand(index);
        let word = index / BITS_PER_ELEM;
        let bit = index % BITS_PER_ELEM;
        let flag = 1 << bit;
        self.bits[word] |= flag;
    }

    pub fn unset(&mut self, index: usize) {
        self.expand(index);
        let word = index / BITS_PER_ELEM;
        let bit = index % BITS_PER_ELEM;
        let flag = 1 << bit;
        self.bits[word] &= !flag;
    }

    pub fn flip(&mut self, index: usize) {
        self.expand(index);
        let word = index / BITS_PER_ELEM;
        let bit = index % BITS_PER_ELEM;
        let flag = 1 << bit;
        // FIXME(Shaohua):
        self.bits[word] &= !flag;
    }

    /// Check bit at `index` is set or not.
    #[must_use]
    pub fn get(&self, index: usize) -> Option<bool> {
        let word = index / BITS_PER_ELEM;
        if word >= self.bits.len() {
            return None;
        }
        let bit = index % BITS_PER_ELEM;
        let flag = 1 << bit;
        Some((self.bits[word] & flag) == flag)
    }

    /// Returns the number of bits set to `true`.
    #[must_use]
    pub fn count_ones(&self) -> usize {
        self.bits
            .iter()
            .map(|byte| byte.count_ones() as usize)
            .sum()
    }

    /// Returns the number of bits set to `false`.
    #[must_use]
    pub fn count_zeros(&self) -> usize {
        self.bits
            .iter()
            .map(|byte| byte.count_zeros() as usize)
            .sum()
    }

    #[must_use]
    #[inline]
    pub const fn iter(&self) -> BitSetIter {
        BitSetIter {
            bit_set: self,
            index: 0,
        }
    }

    /// # Panics
    /// Raise panic if length of two bitset not equal.
    #[must_use]
    pub fn union(&self, other: &Self) -> Self {
        assert_eq!(self.bits.len(), other.bits.len());
        let bits = self
            .bits
            .iter()
            .zip(other.bits.iter())
            .map(|(a, b)| a | b)
            .collect();
        Self { bits }
    }

    /// # Panics
    /// Raise panic if length of two bitset not equal.
    #[must_use]
    pub fn intersect(&self, other: &Self) -> Self {
        assert_eq!(self.bits.len(), other.bits.len());
        let bits = self
            .bits
            .iter()
            .zip(other.bits.iter())
            .map(|(a, b)| a & b)
            .collect();
        Self { bits }
    }

    /// # Panics
    /// Raise panic if length of two bitset not equal.
    #[must_use]
    pub fn difference(&self, other: &Self) -> Self {
        assert_eq!(self.bits.len(), other.bits.len());
        let bits = self
            .bits
            .iter()
            .zip(other.bits.iter())
            .map(|(a, b)| a & !b)
            .collect();
        Self { bits }
    }
}

impl From<String> for BitSet {
    fn from(value: String) -> Self {
        Self {
            bits: value.into_bytes(),
        }
    }
}

impl From<&str> for BitSet {
    fn from(s: &str) -> Self {
        Self::from_bytes(s.as_bytes())
    }
}

macro_rules! from_number_impl {
    ($($t:ty)*) => {$(
        impl From<$t> for BitSet {
            fn from(value: $t) -> Self {
                Self {
                    bits: value.to_le_bytes().to_vec(),
                }
            }
        }
    )*};
}

from_number_impl! {i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize}

impl Index<usize> for BitSet {
    type Output = bool;

    fn index(&self, index: usize) -> &Self::Output {
        if self.get(index).expect("index out of range") {
            &TRUE
        } else {
            &FALSE
        }
    }
}

pub struct BitSetIter<'a> {
    bit_set: &'a BitSet,
    index: usize,
}

impl Iterator for BitSetIter<'_> {
    type Item = bool;

    fn next(&mut self) -> Option<Self::Item> {
        let is_set = self.bit_set.get(self.index);
        if is_set.is_some() {
            self.index += 1;
        }
        is_set
    }
}

impl<'a> IntoIterator for &'a BitSet {
    type IntoIter = BitSetIter<'a>;
    type Item = bool;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}
}

位图的应用

记录每天的活跃用户数量

每天有多少个独立用户访问了服务? 在后端的数据库里, 用户的ID通常都是自增的. 我们可以基于每天的访问日志, 提供出用户的ID, 然后存放一个 BitSet 中, 它会自动去重, 并且只占用很少的空间.

对于有 1000万 用户的应用, 也只需要 10M / 8 = 1.25M 的空间来存储它. 将 bitset 对象存储到硬盘上后, 还可以使用 zstd/gzip 等工具对它进行压缩, 可以进一步降低占用的空间.

另外, 还可以对不同日期的 biset 进行逻辑与 (AND) 操作, 可以提取出极度活跃的用户.

对整数数组快速排序并去重

如果数组中都是整数, 而且数字间比较连续, 比较稠密, 数值的范围也是确定的, 并且要移除重复的元素, 那就可以考虑使用位图来进行快速排序和去重.

使用比特位的下标作为整数值, 这样的话只需要一个比特位就可以表示一个整数, 与 Vec<i32> 等存储结构相比, 位图可以显著地节省存储空间.

use std::fs::File;
use std::io::{self, Read};

use vector::bitset::BitSet;

pub fn random_ints(len: usize) -> Result<Vec<u32>, io::Error> {
    let mut file = File::open("/dev/urandom")?;
    let mut buf = vec![0; len * 4];
    file.read_exact(&mut buf)?;

    let mut nums = vec![0; len];
    for i in 0..len {
        let array: [u8; 4] = [buf[4 * i], buf[4 * i + 1], buf[4 * i + 2], buf[4 * i + 3]];
        let num = u32::from_le_bytes(array);
        nums[i] = num.min(10_000_000);
    }
    Ok(nums)
}

fn main() {
    let mut numbers = random_ints(10_000_000).unwrap();
    let max_number = numbers.iter().max().copied().unwrap_or_default() as usize;
    let mut bit_set = BitSet::with_len(max_number);
    for &num in &numbers {
        bit_set.set(num as usize);
    }

    let sorted: Vec<u32> = bit_set
        .iter()
        .enumerate()
        .filter(|(_index, is_set)| *is_set)
        .map(|(index, _is_set)| index as u32)
        .collect();

    numbers.sort_unstable();
    numbers.dedup();
    assert_eq!(numbers, sorted);
}

时间复杂度是 O(n), 空间复杂度是 O(n).

参考