Skip to content
Menu
CFC Studio
  • 实验室主页
  • CFC 招新简章
  • 友情链接
  • RSS订阅
CFC Studio

Bitmap的设计实现及实战应用

Posted on 2023年4月9日2023年12月14日 by 枫阿雨

大家估计都知道今天的主角Bitmap(位图)这个东西,也都知道它是一种非常有趣且鹅妹子嘤带点儿“黑科技”的数据结构,它能够用非常非常低的存储成本存储数据的状态,在这篇文章中,我将从0到1实现一个Bitmap,并基于Redis的Bitmap在实际应用场景中发挥Bitmap的优点。

实现一个Bitmap

原理与实现思路

我们首先来简单的再过一遍位图的原理,Bitmap是一种将每一个字节用到极致的数据结构,位图的使用场景实际上和布尔数组是相同的,都是表示某个索引代表的数据非0即1的二元对立关系,只不过数据在bitmap中的存储会更加紧凑,使用bit位来作为下标索引,以0或1来代替bool的false or true。当需要在bool数组中判断下标1024的数据是否为true时,就等价于在bitmap中的第1024位是否为1。bool数组在大多数语言中每个bool值的大小都是1byte(8bit)这么算下来,bitmap可以省去8倍的存储空间。

在Java中,单个bool值占4byte,而bool数组的底层实现实际为byte数组,即1byte

在绝大多数的编程语言中,可供我们直接操作的最小单位是byte,只有一个字节,也就是8个bit(这与内存寻址是以byte为单位有关)。所以我们要想操作bit来实现bitmap,需要通过位移操作来完成。

Bitmap实例与创建方法
// Bitmap的数据底层由byte slice组织
type Bitmap []byte

// 通过指定的bit长度新建一个Bitmap实例
func MakeBitmapWithBitSize(nBits int) Bitmap {
    // 规定一个最小长度
    if nBits < 64 {
        nBits = 64
    }
    // 需要确保bit长度为8的倍数
    return MakeBitmapWithByteSize((nBits + 7) / 8)
}

// 通过指定的byte长度新建一个Bitmap实例
func MakeBitmapWithByteSize(nBytes int) Bitmap {
    return make([]byte, nBytes)
}

// 注: pos的起始位置为0
// 将指定pos的bit设置为1
func (b Bitmap) SetTrue(bitPos uint32) {
    b[bitPos/8] |= 1 << (bitPos % 8)
}

// 将指定pos的bit设置为0
func (b Bitmap) SetFalse(bitPos uint32) {
    b[bitPos/8] &= ^(1 << (bitPos % 8))
}

// 判断指定pos的bit是否为1
func (b Bitmap) IsTrue(bitPos uint32) bool {
    return b[bitPos/8]&(1<<(bitPos%8)) != 0
}

// 重置Bitmap
func (b Bitmap) Reset() {
    for i := range b {
        b[i] = 0
    }
}

// Bitmap所存储的字节数
func (b Bitmap) ByteSize(bitPos uint32) int {
    return len(b)
}

// Bitmap所存储的位数
func (b Bitmap) BitSize(bitPos uint32) int {
    return len(b) * 8
}

是的,我们这样就实现了一个Bitmap,是不是会感觉 就这? 确实,Bitmap本身就是一个非常简单朴实的数据结构,给人一种大道至简的感觉。 所有的核心操作都集中以下三步当中,我们来逐一拆解操作过程

// SetTrue
b[bitPos/8] |= 1 << (bitPos % 8)
// SetFalse
b[bitPos/8] &= ^(1 << (bitPos % 8))
// IsTrue
b[bitPos/8] & (1 << (bitPos % 8)) != 0

bi[bitPos/8] |= 1 << (bitPos % 8)

这个操作将Bitmap的 bitPos 位置上的bit位设置为1。 我们通过结合例子的方式逐步用白话解析这个操作:

  • 现假设我需要将0110 0101的第2位通过bitmap.SetTrue(1)置为1
  1. 首先,bitPos / 8 计算出需要操作的字节索引(即第几个字节)。因为一个字节有8个bit位,所以我们使用除法运算来计算该位所在的那个字节。在此例中就是在下标为0的索引上的byte处。
  2. 然后,bitPos % 8 计算出该字节中需要操作的位数。使用取模运算,可以计算出该位在字节中的偏移量。在此例中偏移量为1
  3. 接下来,执行 1 << (bitPos % 8) 按位左移运算,将 1(即0000 0001) 左移 bitPos % 8 位(即位于字节中的偏移量)。在此例中就是将 0000 0001 向左移1位得到 0000 0010
  4. 最后,执行 b[bitPos/8] |= 1 << (bitPos % 8) 操作,我们采用位或运算的操作”|”进行赋值(0|0 = 0、0|1 = 1、1|1 = 1)将该位设置为 1。在此例中就是计算0110 0101 | 0000 0010 = 0111 最终得到我们想要的结果 0110 0111。

b[bitPos/8] &= ^(1 << (bitPos % 8))

这个操作将Bitmap的 bitPos 位置上的bit位设置为0。 我们通过结合例子的方式逐步用白话解析这个操作:

  • 现假设我需要将0110 0101的第6位通过bitmap.SetFalse(5)置为0
  1. 首先,bitPos / 8 计算出需要操作的字节索引(即第几个字节)。因为一个字节有8个bit位,所以我们使用除法运算来计算该位所在的那个字节。在此例中就是在下标为0的索引上的byte处。
  2. 然后,bitPos % 8 计算出该字节中需要操作的位数。使用取模运算,可以计算出该位在字节中的偏移量。在此例中偏移量为5
  3. 接下来,执行 1 << (bitPos % 8) 按位左移运算,将 1(即0001) 左移 bitPos % 8 位(即位于字节中的偏移量)。在此例中就是将 0000 0001 向左移5位得到 0010 0000
  4. 接着,应用按位取反运算符将掩码取反(在Go语言中 “^” 作为一元运算符时表示按位取反操作),得到一个这个除了我们要删除的那个bit位之外所有位都是1的掩码。再此例中就是 1101 1111.
  5. 最后,执行 b[bitPos/8] &= ^(1 << (bitPos % 8)) 操作,我们采用位或运算的操作”&”进行赋值(0|0 = 0、0|1 =0、1|1 = 1)将该位设置为 0。在此例中就是计算 0110 0101 & 1101 1111 = 0111 最终得到我们想要的结果 0100 0101。

b[bitPos/8 ] & (1 << (bitPos % 8)) != 0

这个操作判断 bitmap 的 bitPos 位置上的二进制位是否为1。 我们通过结合例子的方式逐步用白话解析这个操作:

  • 现假设我需要使用bitmap.IsTrue(6)判断0110 0101的第7位是否为1
  1. 首先,bitPos / 8 计算出需要操作的字节索引(即第几个字节)。因为一个字节有8个bit位,所以我们使用除法运算来计算该位所在的那个字节。在此例中就是在下标为0的索引上的byte处。
  2. 然后,bitPos % 8 计算出该字节中需要操作的位数。使用取模运算,可以计算出该位在字节中的偏移量。在此例中偏移量为6
  3. 接下来,执行 1 << (bitPos % 8) 按位左移运算,将 1(即0000 0001) 左移 bitPos % 8 位(即位于字节中的偏移量)。在此例中就是将 0000 0001 向左移6位得到 0100 0000。
  4. 接着,执行 b[bitPos/8] & (1 << (bitPos % 8)) 操作,我们通过与运算只保留bitmap中的指定字节其他bit位均为0,如果不为1的话,那么这8个bit全部都归零,如果为1,则这8个bit组成的byte必不等于0。在此例中就是计算 0110 0101 & 0100 0000 = 0100 0000
  5. 最后,执行 != 0 操作,如果该位为1,则返回true;否则返回false。

怎么样是不是非常的清清爽爽~

既然造完了轮子咱们就开始实战吧~

利用Redis的Bitmap实现用户点赞

嘿嘿,bitmap的实战我偏不用自己造的轮子,就是玩儿~

其实主要是Redis中的数据结构是可以跨进程共享的,所以一般应用的更广泛些,我们造好的轮子将在后面的实战文章中使用(用来造另一个轮子XD)

在很多的业务中模块,都会有一个点赞的功能,点赞操作可能会比较频繁,而且因为可能会涉及到资源的热度和推荐指数,且用户的直接感知很强,所以也算是一个相对重要的功能。

点赞功能的设计有以下几点:

  1. 要能够及时反馈,让用户有所感知,并且要尽量避免用户显示数据的不一致
  2. 要能够标记用户是否已点赞,避免用户刷赞操作
  3. 一个页面加载时可能会有大量需要统计点赞的资源,需要对点赞进行缓存处理,避免大量的磁盘io

结合以上设计需求,我先说出最终可以选择的方案:

采用基于Redis的异步缓存写入策略,以资源类型的前缀与资源ID作为Key,维护一个Bitmap,以用户的自增ID作为Bitmap中的标识索引,实现用户的点赞操作的记录。后台运行一个定时定时任务,定时将Redis中的数据批量同步到数据库的持久化存储中。

这样设计有以下的特点:

  1. 用户获取点赞数、执行点赞操作都是直接和基于内存的Redis打交道,速度相对较快。
  2. 使用Bitmap存储用户点赞记录,可省下大量的内存空间。
  3. 采用异步缓存写入策略,本质上是依赖Redis作为稳定的存储介质而非简单的缓存,只要Redis不崩溃在客户端就不会发生不一致的现象(比如点赞后用户刷新,发现点赞消失)。

当然,模块的设计与技术选型是需要根据业务本身的特点而定的,在当前的业务设计中就是默认Redis是较为可靠的存储,基本不会出现Redis崩溃的状况。

代码示例

以评论模块的点赞操作为例

一些const枚举值

type IsLike int
const (
    UNLIKE IsLike = iota
    LIKE
)

const(
  // COMMENT:LIKE:commentID -> bitmap
  COMMENT_LIKE_REDISPREKEY = "COMMENT:LIKE:"
)

核心方法

// 更改点赞状态,由上游传入的isLike判断是点赞操作还是取消点赞操作
func (c *CommentDomain) ConvertLikeState(commentId int64, uid int64, isLike IsLike) *errs.BError {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    key := COMMENT_LIKE_REDISPREKEY+strconv.FormatInt(commentId, 10)
    err := c.redis.SetBit(ctx, key, uid, int(isLike)).Error()
    if err != nil {
        zap.L().Error("updateLikeState redis SetBit ERROR", zap.Error(err))
        return errs.RedisError
    }
    return nil
}

// 判断用户是否已点赞(前端需要知晓的状态)
func (c *CommentDomain) HasLiked(commentId int64, uid int64) (bool, *errs.BError) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    key := COMMENT_LIKE_REDISPREKEY+strconv.FormatInt(commentId, 10)
    bit, err := c.redis.GetBit(ctx, key, uid).Result()
    if err != nil {
        zap.L().Error("HasLiked redis GetBit ERROR", zap.Error(err))
        return false, errs.RedisError
    }
    return bit == 1, nil
}

// 获取评论的点赞数
func (c *CommentDomain) CountLikeNum(commentId int64) (int64, *errs.BError) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    key := COMMENT_LIKE_REDISPREKEY+strconv.FormatInt(commentId, 10)
    count, err := c.redis.BitCount(ctx, key, &redis.BitCount{Start: 0, End: -1}).Result()
    if err != nil {
        zap.L().Error("CountLikeNum redis BitCount ERROR", zap.Error(err))
        return 0, errs.RedisError
    }
    return count, nil
}

简单的测试用例

var commentDomain = domain.NewCommentDomain()

func TestLike(t *testing.T) {
    count, err := commentDomain.CountLikeNum(5)
    log.Printf("init comment %d Like Num %d \n", 5, count)

    err = commentDomain.ConvertLikeState(5, 1000, LIKE)
    count, err = commentDomain.CountLikeNum(5)
    log.Printf("comment %d Like Num %d  after %d like \n", 5, count, 1000)

    err = commentDomain.ConvertLikeState(5, 1001, LIKE)
    count, err = commentDomain.CountLikeNum(5)
    log.Printf("comment %d Like Num %d  after %d like \n", 5, count, 1000)

    err = commentDomain.ConvertLikeState(5, 1001, UNLIKE)
    count, err = commentDomain.CountLikeNum(5)
    log.Printf("comment %d Like Num %d  after %d unlike \n", 5, count, 1001)

    liked, err := commentDomain.HasLiked(5, 1001)
    count, err = commentDomain.CountLikeNum(5)
    log.Printf("%d hasLiked comment %d : %t \n", 1001, count, liked)

    liked, err = commentDomain.HasLiked(5, 1000)
    count, err = commentDomain.CountLikeNum(5)
    log.Printf("%d hasLiked comment %d : %t \n", 1000, count, liked)

    if err != nil {
        log.Fatal(err)
    }
}

输出结果

=== RUN   TestLike
2023/04/08 16:52:33 init comment 5 Like Num 0 
2023/04/08 16:52:33 comment 5 Like Num 1  after 1000 like 
2023/04/08 16:52:33 comment 5 Like Num 2  after 1000 like 
2023/04/08 16:52:33 comment 5 Like Num 1  after 1001 unlike 
2023/04/08 16:52:33 1001 hasLiked comment 1 : false 
2023/04/08 16:52:33 1000 hasLiked comment 1 : true 
--- PASS: TestLike (0.02s)
PASS

这样,一个最基本的评论点赞模块就做好了,当然,这是将封装好的模块拆出来放在这里的示例代码,在实际应用中应该遵循项目的模块进行封装

Extra

在调研其他网站的api设计的时候,发现了一个很有意思的点。CSDN在点赞模块的设计上,点赞与取消点赞的客户端请求完全一样!

无论是请求URL、请求方法、表单数据全部都一样!

截图

截图

在返回相应的时候,数据才会有所不同

截图

截图

这是一个很有意思的点,如果前端不传状态的话,后端该如何判断此操作是取消点赞还是点赞操作呢?

既然前端没有标明,那就只能后端先查再改了,但是这无疑是比较吃资源的行为。假设csdn的后台也是用Redis来存储点赞标识,那么也需要查两次Redis,多一次网络通信的开销,我能想到的稍微合理一点的操作逻辑是配合使用lua脚本,在Redis中执行lua脚本进行分支判断进行Compare And Swap的原子操作,代码实例如下

func (c *CommentDomain) ConvertLikeStateCAS(commentId int64, uid int64) (int64, bool, *errs.BError) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    // lua脚本
    script := `
        local key = KEYS[1]
        local userId = ARGV[1]
        local isLiked = redis.call("GETBIT", key, userId)
        if isLiked == 1 then
            redis.call("SETBIT", key, userId, 0)
            local isLiked = 0
        else
            redis.call("SETBIT", key, userId, 1)
            local isLiked = 1
        end
        local likeCount = redis.call("BITCOUNT", key)
        return {likeCount, isLiked}
    `
    key := COMMENT_LIKE_REDISPREKEY + strconv.FormatInt(commentId, 10)
    result, err := c.redis.Eval(ctx, script, []string{key}, uid)
    resArr := result.([]interface{})
    likeCount := resArr[0].(int64)
    hassLiked := resArr[1].(int64)
    if err != nil {
        zap.L().Error("ConvertLikeStateCAS redis Eval ERROR", zap.Error(err))
        return 0, false, errs.RedisError
    }
    return likeCount, hassLiked == 0, nil
}

简单的测试用例

func TestEval(t *testing.T) {
    // 1000 1001 1002 点赞 1002取消赞
    num, like, bError := commentDomain.ConvertLikeStateCAS(6, 1000)
    log.Println("after 0 like status: ", like, " likeNum: ", num)

    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1001)
    log.Println("after 1 like status: ", like, " likeNum: ", num)

    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1002)
    log.Println("after 2 like status: ", like, " likeNum: ", num)

    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1003)
    log.Println("after 3 like status: ", like, " likeNum: ", num)

    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1001)
    log.Println("after 1 unlike status: ", like, " likeNum: ", num)

    for i := 1000; i < 1004; i++ {
        hasLiked, _ := commentDomain.HasLiked(6, int64(i))
        log.Println("member", i, "hasliked ", hasLiked)
    }
    if bError != nil {
        log.Fatal(bError)
    }
    log.Println("after likeNum: ", num)
}

简单的测试用例

func TestEval(t *testing.T) {
    // 1000 1001 1002 点赞 1002取消赞
    num, like, bError := commentDomain.ConvertLikeStateCAS(6, 1000)
    log.Println("after 1000 like status: ", like, " likeNum: ", num)
    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1001)
    log.Println("after 1001 like status: ", like, " likeNum: ", num)
    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1002)
    log.Println("after 1002 like status: ", like, " likeNum: ", num)
    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1003)
    log.Println("after 1003 like status: ", like, " likeNum: ", num)
    num, like, bError = commentDomain.ConvertLikeStateCAS(6, 1001)
    log.Println("after 1001 unlike status: ", like, " likeNum: ", num)

    for i := 1000; i < 1004; i++ {
        hasLiked, _ := commentDomain.HasLiked(6, int64(i))
        log.Println("member", i, "hasliked ", hasLiked)
    }
    if bError != nil {
        log.Fatal(bError)
    }
    log.Println("after likeNum: ", num)
}

输出结果

=== RUN   TestEval
2023/04/08 19:34:29 after 1000 like status:  true  likeNum:  1
2023/04/08 19:34:29 after 1001 like status:  true  likeNum:  2
2023/04/08 19:34:29 after 1002 like status:  true  likeNum:  3
2023/04/08 19:34:29 after 1003 like status:  true  likeNum:  4
2023/04/08 19:34:29 after 1001 unlike status:  false  likeNum:  3
2023/04/08 19:34:29 member 1000 hasliked  true
2023/04/08 19:34:29 member 1001 hasliked  false
2023/04/08 19:34:29 member 1002 hasliked  true
2023/04/08 19:34:29 member 1003 hasliked  true
2023/04/08 19:34:29 after likeNum:  3
--- PASS: TestEval (0.03s)
PASS

这样一来,就可以实现和csdn的点赞一样的效果了(虽然至今我也不知道这样设计的优点在哪里,但还是挺有意思的

说起csdn,推荐一个油猴脚本 ,可以避免登录才能复制之类的讨厌的限制(虽然改变不了csdn里面的内容多数都是又水又烂的现状,能在国内的掘金知乎找到的东西还是别去看csdn了吧
🔥持续更新🔥 CSDN广告完全过滤、人性化脚本优化:🆕 不用再登录了!让你体验令人惊喜的崭新CSDN (greasyfork.org)

Tips:

以上的设计是基于用户id为自增id的基础上而实现的,如果在你的系统中使用的是UUID或者雪花算法生成的ID请勿直接使用bitmap,这会导致bitmap爆掉~
UUID是128位的字符串,且完全没有顺序,并不适合使用bitmap存储
雪花算法生成ID是64位的整型,也不能直接使用bitmap因为 根 本 存 不 下!

2^64 bit = 18446744073709551616 bit
= 2305843009213696 MB
= 2251799813685248 GB
= 2202670619951340.672 TB

(不过雪花算法是有序的,所以理论上可以从最小id开始记录)
所以你也看到了bitmap的一个缺陷,它最适合存储的是连续的数据,它所占用的空间也是根据最大的数而定的,如果系统中的数据分布的非常广,可能bitmap就并不是很合适,这种情况下,可能采用hash的方式是更合理的。
此外,Redis中还提供了一种叫做HyperLogLog的数据结构用于基数统计,它要比bitmap还要更“黑科技”,不过相应的,它也是以部分准确性的代价来换取的,可以用在对准确性没有那么高的统计数值上,比如“浏览量”,之后有时间的话也许会写一片文章来介绍一下HLL在实践中的应用吧XD

发表评论 取消回复

您的电子邮箱地址不会被公开。 必填项已用*标注

分类

  • CFC 周刊 (4)
  • CFC 技术 (44)
  • CFC 日常 (3)
  • 未分类 (15)
  • 活动通知 (3)

标签

ACM Android anime animeloop animeloop-cli APP Apple aria2 Array Blog CFC数据结构与算法训练指南 CoreData CQUT Don't Starve Hexo iBooks JavaScript macOS Matlab moeoverflow OpenCV Programming README RxJS SQLite SQLite3 Steam Swift Theme Web Xcode 主题模板 动漫 博客 反编译 妹子 循环 教程 数据库 游戏 算法 装逼 视频 重庆理工大学 饥荒

登录
©2025 CFC Studio | Powered by WordPress & Superb Themes