从Redis源码上来聊聊KV模型-Hash数据类型

之前就说了要来西索Redis,现在来辣!
本文的部分内容参考自《小林Coding》,部分地方根据源代码进行剖析。
Redis源码地址:https://github.com/redis/redis.git
阅读本文之前建议先阅读我的上一篇文章:神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi)

Hash

建议先阅读:[神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi)](https:// www.wzl.fyi/2023/07/20/986/),或者本页面的第一章

观其面

哈希结构大家也不陌生了,kv嘛,redis不就是kv模型嘛,那么Redis里面hash又是啥?

这里给大家说一下,就是套娃,用伪代码表示就是这样

HashMap<String,HashMap<String,String>>

当然,实际的类型是泛型。

和String相比具体的区别就是这样

image-20230720215240368

如果你用mysql数据库来表示的话

hash其实是field,value

然后每个hash对应一个表

如果放在Redis的话就是一个key里面放了很多个field,value

可以这样理解:

key field value

其实神简单,说多了还容易绕

应用场景

购物车实现

key:uid

field:商品id

value: 数量

  • 添加商品:HSET cart:{用户id} {商品id} 1
  • 添加数量:HINCRBY cart:{用户id} {商品id} 1
  • 商品总数:HLEN cart:{用户id}
  • 删除商品:HDEL cart:{用户id} {商品id}
  • 获取购物车所有商品:HGETALL cart:{用户id}

获取商品信息还得进数据库回表一次,拿到完整的商品信息

分布式锁实现

[Redis分布式锁深入分析 – Karos (wzl1.top)](https:// www.wzl1.top/2023/06/16/951/)

之前我讲过,这里再简单说一下把

key为锁名,field是线程ID(加上服务的UUID),然后value是进入锁的次数

看一个加锁的Lua脚本吧,为什么用Lua脚本?我们后面细说。

    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果不是自己的锁
        if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
            -- 不是自己的锁,返回剩余时间
            return redis.call("pttl",KEYS[1]);
        end
        -- 如果是自己的锁就记录次数
        redis.call("hincrby",KEYS[1],ARGS[1],1);
        -- 延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    else
        -- 如果Lock不存在,那么就直接加上就可以了,hhh
        redis.call("hset",KEYS[1],ARGS[1],1);
        -- 设置默认延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    end
    return nil;

究其身

当数据量少的时候使用压缩链表(早期,Redis7.0以后完成的对ZipList的全部替代,都用LIstPack来实现了)

  • 如果哈希类型元素个数小于 512 个(默认值,可由 hash-max-ziplist-entrieshash-max-listpack-entries 配置),所有值小于 64 字节(默认值,可由 hash-max-ziplist-value 配置)的话,Redis 会使用压缩列表/ListPack作为 Hash 类型的底层数据结构;
  • 如果哈希类型元素不满足上面条件,Redis 会使用哈希表作为 Hash 类型的 底层数据结构。

哈希结构客户端哈希设计(基于Redis 7.0解释)

在之前我们讲过dict,dict是用于存储key和value的,但是这里我们客户端hash是将dictEntry的集合放入value,显然不能够用原来服务器的value来存储一个新的dict,你不知道客户端输入的是什么属性

呃呃呃,这上面几句话说的可能有点绕,看代码吧。

其实Redis的开发者确实考虑到了这一点,所以做了个区分,两个虽然都叫做dict,但是所属dir不一样

// /deps/hiredis/dict.h 客户端源码
typedef struct dict {
    dictEntry **table;
    dictType *type;
    // 哈希表大小,桶的数量
    unsigned long size;
    // 哈希表大小掩码,用于计算索引值
    unsigned long sizemask;
    // 已有哈希节点数量
    unsigned long used;
    void *privdata;
} dict;

typedef struct dictEntry {
    void *key;
    void *val;
    struct dictEntry *next;
} dictEntry;

typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);   // 析构
    void (*valDestructor)(void *privdata, void *obj);   // 析构
} dictType;

先说一下新增元素,RedisHash采用的是头插法

/* Add an element to the target hash table */
static int dictAdd(dict *ht, void *key, void *val) {
    int index;
    dictEntry *entry;

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(ht, key)) == -1)
        return DICT_ERR;

    /* Allocates the memory and stores key */
    entry = hi_malloc(sizeof(*entry));
    if (entry == NULL)
        return DICT_ERR;
    // 执行头茬
    entry->next = ht->table[index];
    ht->table[index] = entry;

    /* Set the hash entry fields. */
    dictSetHashKey(ht, entry, key);
    dictSetHashVal(ht, entry, val);
    ht->used++;
    return DICT_OK;
}

对于Hash结构来说,有个扩容操作,同时会伴随rehash,我们来看看

/* Expand or create the hashtable */
static int dictExpand(dict *ht, unsigned long size) {
    dict n; /* the new hashtable */
    unsigned long realsize = _dictNextPower(size), i;   // 通过该方法获取真实内存,扩大为比size大的第一个2^n

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hashtable */
    if (ht->used > size)      // size不合法,如果哈希总节点数大于了size
        return DICT_ERR;

    // 对新的hash进行初始化
    _dictInit(&n, ht->type, ht->privdata);
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = hi_calloc(realsize,sizeof(dictEntry*));
    if (n.table == NULL)
        return DICT_ERR;

    /* Copy all the elements from the old to the new table: 把旧数据复制到新的哈希表中
     * note that if the old hash table is empty ht->size is zero,
     * so dictExpand just creates an hash table. */
    n.used = ht->used;
    for (i = 0; i < ht->size && ht->used > 0; i++) {
        dictEntry *he, *nextHe;

        if (ht->table[i] == NULL) continue;

        /* For each hash entry on this slot... */
        he = ht->table[i];
        while(he) {
            unsigned int h;

            nextHe = he->next;       // 先获取下一次循环需要的entry
            /* Get the new element index */ // 通过计算哈希值获取新的下标(索引计算),其实相当于对长度取余
            h = dictHashKey(ht, he->key) & n.sizemask;   
            // 头插
            he->next = n.table[h];
            n.table[h] = he;
            ht->used--;      // ht被使用的节点减少
            /* Pass to the next element */
            he = nextHe;    // 移步到下一个
        }
    }
    assert(ht->used == 0);
    hi_free(ht->table);

    /* Remap the new hashtable in the old */
    *ht = n;
    return DICT_OK;
}
为什么扩容是2^n

其实这里哈希表扩容和Java中HashMap扩容原理一样,我们来回忆一下

  • HashMap是如何扩容的

    HashMap扩容依靠的是resize()方法,根据负载因子来计算临界值,达到临界值进行扩容。

    而resize()方法也是通过一个newTable进行操作的

  • Rehash如何确定新元素的位置

​ 通过将hashcode()%length,当leng为$2^n$时,刚好等于hashcode()&(length-1)

但是对于Redis对象存储来说,具体的rehash的方法是拉链法,和JavaSE HashMap类似,同样使用,这样做只是为了加快计算(直接对内存庶几乎进行操作)

Redis服务端存储原理-Hash解析

ReHash

在之前,其实讲到了,Redis的存储原理,其中使用的dict中,直接便是dictEntry

    dictEntry **ht_table[2];    // 第二个表用来进行rehash   
    /**
    *   dictEntry **ht_table[2]; <==>
    *   typedef struct dictht{
    *       dictEntry **table;
    *   }dictht;
    *    ditcht ht[2];
    **/

之前我们说到第二个ht_table用来做rehash,是这样的,原理其实和上面也类似。

不过为了内存切换方便,每次rehash的时候,只需要将ht_table[0]和ht_table[1]进行交换即可,扩容完毕后直接使用,不用再换回去(类比JVM中幸存区From和幸存区To的交换)。

那么为什么服务端存储kv的哈希靠复制交换,而客户端确是直接新建一个dict N呢?

其实原因可想而知。

  • 服务器的ht数量是固定的,而hash的数据是由客户端操作,数量不定,如果采取复制的话浪费空间,要扩容的时候new一个就行,这一个其实不太重要,重要的在后面
  • 最重要的其实时,服务端一定会保证长连接状态,当数据量很大的时候,我们不可能花较长时间来进行rehash,所以我们采用两个哈希表进行渐进式rehash
渐进式ReHash

为了避免 rehash 在数据迁移过程中,因拷贝数据的耗时,影响 Redis 性能的情况,所以 Redis 采用了渐进式 rehash,也就是将数据的迁移的工作不再是一次性迁移完成,而是分多次迁移。

渐进式 rehash 步骤如下:

  • 给「哈希表 2」 分配空间;
  • 在 rehash 进行期间,每次哈希表元素进行新增、删除、查找或者更新操作时,Redis 除了会执行对应的操作之外,还会顺序将「哈希表 1 」中索引位置上的所有 key-value 迁移到「哈希表 2」 上
  • 随着处理客户端发起的哈希表操作请求数量越多,最终在某个时间点会把「哈希表 1 」的所有 key-value 迁移到「哈希表 2」,从而完成 rehash 操作。

这样就巧妙地把一次性大量数据迁移工作的开销,分摊到了多次处理请求的过程中,避免了一次性 rehash 的耗时操作。

在进行渐进式 rehash 的过程中,会有两个哈希表,所以在渐进式 rehash 进行期间,哈希表元素的删除、查找、更新等操作都会在这两个哈希表进行。

比如,查找一个 key 的值的话,先会在「哈希表 1」 里面进行查找,如果没找到,就会继续到哈希表 2 里面进行找到。

另外,在渐进式 rehash 进行期间,新增一个 key-value 时,会被保存到「哈希表 2 」里面,而「哈希表 1」 则不再进行任何添加操作,这样保证了「哈希表 1 」的 key-value 数量只会减少,随着 rehash 操作的完成,最终「哈希表 1 」就会变成空表。

ReHash触发条件

介绍了 rehash 那么多,还没说什么时情况下会触发 rehash 操作呢?

rehash 的触发条件跟负载因子(load factor)有关系。

负载因子可以通过下面这个公式计算:

img

触发 rehash 操作的条件,主要有两个:

  • 当负载因子大于等于 1 ,并且 Redis 没有在执行 bgsave 命令或者 bgrewiteaof 命令,也就是没有执行 RDB 快照或没有进行 AOF 重写的时候,就会进行 rehash 操作。
  • 当负载因子大于等于 5 时,此时说明哈希冲突非常严重了,不管有没有有在执行 RDB 快照或 AOF 重写,都会强制进行 rehash 操作。
服务端哈希扩容

在讲扩容源码之前,我们先来回顾之前讲的内容

// dictEntry的实现很简单,就是个链表
struct dictEntry {
    void *key;
    union {
        void *val;          // 其他
        uint64_t u64;       // 整数
        int64_t s64;        // 整数
        double d;           // 双精度数值
    } v;
    struct dictEntry *next;     /* Next entry in the same hash bucket.  指向下一个hash值相同的entry*/
    void *metadata[];           /* An arbitrary number of bytes (starting at a
                                 * pointer-aligned address) of size as returned
                                 * by dictType's dictEntryMetadataBytes(). */
};

struct dict {
    dictType *type;

    dictEntry **ht_table[2];    // 第二个表用来进行rehash   
    /**
    *   dictEntry **ht_table[2]; <==>
    *   typedef struct dictht{
    *       dictEntry **table;
    *   }dictht;
    *    ditcht ht[2];
    **/

    unsigned long ht_used[2];   

    // 用于保证rehash安全,如果值为-1,那么不能执行rehash
    long rehashidx;

    // redis6之后新加入,如果值>0则暂停rehash,保证rehash安全
    /* Keep small vars at end for optimal (minimal) struct padding */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */

    // 哈希的内存指数
    signed char ht_size_exp[2]; 

    // 由dictType的dictEntryBytes定义的大小的任意字节数(从指针对齐的地址开始)。
    void *metadata[];          
};

说到这里,我们来看看扩容的源码,先来个宏定义

#define dictIsRehashing(d) ((d)->rehashidx != -1)

再来一个获取新内存大小的源码

static signed char _dictNextExp(unsigned long size)
{
    unsigned char e = DICT_HT_INITIAL_EXP;

    if (size >= LONG_MAX) return (8*sizeof(long)-1);
    // 这里采用的是用移位来计算,保证效率
    while(1) {
        if (((unsigned long)1<<e) >= size)
            return e;
        e++;
    }
}

一个判断是否运行扩容的"私有"函数

/* ------------------------- private functions ------------------------------ */

/* Because we may need to allocate huge memory chunk at once when dict
 * expands, we will check this allocation is allowed or not if the dict
 * type has expandAllowed member function. */
static int dictTypeExpandAllowed(dict *d) {
    // 不允许返回1
    if (d->type->expandAllowed == NULL) return 1;
    // 这里的计算我们后面讲
    return d->type->expandAllowed(
                    DICTHT_SIZE(_dictNextExp(d->ht_used[0] + 1)) * sizeof(dictEntry*),
                    (double)d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]));
}

下面是扩容源码

    /* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 如果正在rehash,那么直接返回Ok
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 如果哈希表是空的,那么就创建
    if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    // 如果不被允许扩展就返回ok
    if (!dictTypeExpandAllowed(d))
        return DICT_OK;
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht_used[0] + 1); // +1是为了避免和当前分配相同的内存
    }
    // 分配成功
    return DICT_OK;
}

/* Expand or create the hash table,
 * when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).
 * Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
    if (malloc_failed) *malloc_failed = 0;

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht_used[0] > size)       // 判断是否能够哈希,并且传入的size合法
        return DICT_ERR;

    /* the new hash table */
    dictEntry **new_ht_table;       // 创建一个新的哈希表
    unsigned long new_ht_used;
    signed char new_ht_size_exp = _dictNextExp(size);   // 计算log(新哈希表的长度)

    /* Detect overflows */
    size_t newsize = 1ul<<new_ht_size_exp;                // 计算长度
    if (newsize < size || newsize * sizeof(dictEntry*) < newsize) // 如果比原来的还小
        return DICT_ERR;

    /* Rehashing to the same table size is not useful. */
    if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;       // 如果和原来相同,没用

    /* Allocate the new hash table and initialize all pointers to NULL */   // 分配内存,并且memset all为NULL
    if (malloc_failed) {
        new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
        *malloc_failed = new_ht_table == NULL;  // 判断是否分配成功
        if (*malloc_failed)
            return DICT_ERR;
    } else
        new_ht_table = zcalloc(newsize*sizeof(dictEntry*));

    new_ht_used = 0;        // 新的表还没有使用过

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht_table[0] == NULL) {                // 如果是第一次次创建,使用第一张表为哈希表
        d->ht_size_exp[0] = new_ht_size_exp;
        d->ht_used[0] = new_ht_used;
        d->ht_table[0] = new_ht_table;
        return DICT_OK;
    }
    /* Prepare a second hash table for incremental rehashing */
    // 使用第二张表进行rehash
    d->ht_size_exp[1] = new_ht_size_exp;
    d->ht_used[1] = new_ht_used;
    d->ht_table[1] = new_ht_table;
    // 表示当前表正在进行哈希中,后续不可以进行rehash,具体的判断在前面的宏定义内
    d->rehashidx = 0;
    return DICT_OK;
}
  • 微信或QQ扫一扫

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

目录