之前就说了要来西索Redis,现在来辣!
本文的部分内容参考自《小林Coding》,部分地方根据源代码进行剖析。
Redis源码地址:https://github.com/redis/redis.git
阅读本文之前建议先阅读我的上一篇文章:神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi)
Hash
建议先阅读:[神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi)](https:// www.wzl.fyi/2023/07/20/986/),或者本页面的第一章
观其面
哈希结构大家也不陌生了,kv嘛,redis不就是kv模型嘛,那么Redis里面hash又是啥?
这里给大家说一下,就是套娃,用伪代码表示就是这样
HashMap<String,HashMap<String,String>>
当然,实际的类型是泛型。
和String相比具体的区别就是这样
如果你用mysql数据库来表示的话
hash其实是field,value
然后每个hash对应一个表
如果放在Redis的话就是一个key里面放了很多个field,value
可以这样理解:
key field value
其实神简单,说多了还容易绕
应用场景
购物车实现
key:uid
field:商品id
value: 数量
- 添加商品:
HSET cart:{用户id} {商品id} 1
- 添加数量:
HINCRBY cart:{用户id} {商品id} 1
- 商品总数:
HLEN cart:{用户id}
- 删除商品:
HDEL cart:{用户id} {商品id}
- 获取购物车所有商品:
HGETALL cart:{用户id}
获取商品信息还得进数据库回表一次,拿到完整的商品信息
分布式锁实现
[Redis分布式锁深入分析 – Karos (wzl1.top)](https:// www.wzl1.top/2023/06/16/951/)
之前我讲过,这里再简单说一下把
key为锁名,field是线程ID(加上服务的UUID),然后value是进入锁的次数
看一个加锁的Lua脚本吧,为什么用Lua脚本?我们后面细说。
-- 如果Lock存在
if redis.call("exists",KEYS[1]) ~= 0 then
-- 如果不是自己的锁
if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
-- 不是自己的锁,返回剩余时间
return redis.call("pttl",KEYS[1]);
end
-- 如果是自己的锁就记录次数
redis.call("hincrby",KEYS[1],ARGS[1],1);
-- 延期
redis.call("pexpire",KEYS[1],ARGS[2]);
else
-- 如果Lock不存在,那么就直接加上就可以了,hhh
redis.call("hset",KEYS[1],ARGS[1],1);
-- 设置默认延期
redis.call("pexpire",KEYS[1],ARGS[2]);
end
return nil;
究其身
当数据量少的时候使用压缩链表(早期,Redis7.0以后完成的对ZipList的全部替代,都用LIstPack来实现了)
- 如果哈希类型元素个数小于
512
个(默认值,可由hash-max-ziplist-entries
和hash-max-listpack-entries
配置),所有值小于64
字节(默认值,可由hash-max-ziplist-value
配置)的话,Redis 会使用压缩列表/ListPack作为 Hash 类型的底层数据结构; - 如果哈希类型元素不满足上面条件,Redis 会使用哈希表作为 Hash 类型的 底层数据结构。
哈希结构客户端哈希设计(基于Redis 7.0解释)
在之前我们讲过dict,dict是用于存储key和value的,但是这里我们客户端hash是将dictEntry的集合放入value,显然不能够用原来服务器的value来存储一个新的dict,你不知道客户端输入的是什么属性
呃呃呃,这上面几句话说的可能有点绕,看代码吧。
其实Redis的开发者确实考虑到了这一点,所以做了个区分,两个虽然都叫做dict,但是所属dir不一样
// /deps/hiredis/dict.h 客户端源码
typedef struct dict {
dictEntry **table;
dictType *type;
// 哈希表大小,桶的数量
unsigned long size;
// 哈希表大小掩码,用于计算索引值
unsigned long sizemask;
// 已有哈希节点数量
unsigned long used;
void *privdata;
} dict;
typedef struct dictEntry {
void *key;
void *val;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
unsigned int (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key); // 析构
void (*valDestructor)(void *privdata, void *obj); // 析构
} dictType;
先说一下新增元素,RedisHash采用的是头插法
/* Add an element to the target hash table */
static int dictAdd(dict *ht, void *key, void *val) {
int index;
dictEntry *entry;
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(ht, key)) == -1)
return DICT_ERR;
/* Allocates the memory and stores key */
entry = hi_malloc(sizeof(*entry));
if (entry == NULL)
return DICT_ERR;
// 执行头茬
entry->next = ht->table[index];
ht->table[index] = entry;
/* Set the hash entry fields. */
dictSetHashKey(ht, entry, key);
dictSetHashVal(ht, entry, val);
ht->used++;
return DICT_OK;
}
对于Hash结构来说,有个扩容操作,同时会伴随rehash,我们来看看
/* Expand or create the hashtable */
static int dictExpand(dict *ht, unsigned long size) {
dict n; /* the new hashtable */
unsigned long realsize = _dictNextPower(size), i; // 通过该方法获取真实内存,扩大为比size大的第一个2^n
/* the size is invalid if it is smaller than the number of
* elements already inside the hashtable */
if (ht->used > size) // size不合法,如果哈希总节点数大于了size
return DICT_ERR;
// 对新的hash进行初始化
_dictInit(&n, ht->type, ht->privdata);
n.size = realsize;
n.sizemask = realsize-1;
n.table = hi_calloc(realsize,sizeof(dictEntry*));
if (n.table == NULL)
return DICT_ERR;
/* Copy all the elements from the old to the new table: 把旧数据复制到新的哈希表中
* note that if the old hash table is empty ht->size is zero,
* so dictExpand just creates an hash table. */
n.used = ht->used;
for (i = 0; i < ht->size && ht->used > 0; i++) {
dictEntry *he, *nextHe;
if (ht->table[i] == NULL) continue;
/* For each hash entry on this slot... */
he = ht->table[i];
while(he) {
unsigned int h;
nextHe = he->next; // 先获取下一次循环需要的entry
/* Get the new element index */ // 通过计算哈希值获取新的下标(索引计算),其实相当于对长度取余
h = dictHashKey(ht, he->key) & n.sizemask;
// 头插
he->next = n.table[h];
n.table[h] = he;
ht->used--; // ht被使用的节点减少
/* Pass to the next element */
he = nextHe; // 移步到下一个
}
}
assert(ht->used == 0);
hi_free(ht->table);
/* Remap the new hashtable in the old */
*ht = n;
return DICT_OK;
}
为什么扩容是2^n
其实这里哈希表扩容和Java中HashMap扩容原理一样,我们来回忆一下
-
HashMap是如何扩容的
HashMap扩容依靠的是resize()方法,根据负载因子来计算临界值,达到临界值进行扩容。
而resize()方法也是通过一个newTable进行操作的
-
Rehash如何确定新元素的位置
通过将hashcode()%length,当leng为$2^n$时,刚好等于hashcode()&(length-1)
但是对于Redis对象存储来说,具体的rehash的方法是拉链法,和JavaSE HashMap类似,同样使用,这样做只是为了加快计算(直接对内存庶几乎进行操作)
Redis服务端存储原理-Hash解析
ReHash
在之前,其实讲到了,Redis的存储原理,其中使用的dict中,直接便是dictEntry
dictEntry **ht_table[2]; // 第二个表用来进行rehash
/**
* dictEntry **ht_table[2]; <==>
* typedef struct dictht{
* dictEntry **table;
* }dictht;
* ditcht ht[2];
**/
之前我们说到第二个ht_table用来做rehash,是这样的,原理其实和上面也类似。
不过为了内存切换方便,每次rehash的时候,只需要将ht_table[0]和ht_table[1]进行交换即可,扩容完毕后直接使用,不用再换回去(类比JVM中幸存区From和幸存区To的交换)。
那么为什么服务端存储kv的哈希靠复制交换,而客户端确是直接新建一个dict N呢?
其实原因可想而知。
- 服务器的ht数量是固定的,而hash的数据是由客户端操作,数量不定,如果采取复制的话浪费空间,要扩容的时候new一个就行,这一个其实不太重要,重要的在后面
- 最重要的其实时,服务端一定会保证长连接状态,当数据量很大的时候,我们不可能花较长时间来进行rehash,所以我们采用两个哈希表进行渐进式rehash
渐进式ReHash
为了避免 rehash 在数据迁移过程中,因拷贝数据的耗时,影响 Redis 性能的情况,所以 Redis 采用了渐进式 rehash,也就是将数据的迁移的工作不再是一次性迁移完成,而是分多次迁移。
渐进式 rehash 步骤如下:
- 给「哈希表 2」 分配空间;
- 在 rehash 进行期间,每次哈希表元素进行新增、删除、查找或者更新操作时,Redis 除了会执行对应的操作之外,还会顺序将「哈希表 1 」中索引位置上的所有 key-value 迁移到「哈希表 2」 上;
- 随着处理客户端发起的哈希表操作请求数量越多,最终在某个时间点会把「哈希表 1 」的所有 key-value 迁移到「哈希表 2」,从而完成 rehash 操作。
这样就巧妙地把一次性大量数据迁移工作的开销,分摊到了多次处理请求的过程中,避免了一次性 rehash 的耗时操作。
在进行渐进式 rehash 的过程中,会有两个哈希表,所以在渐进式 rehash 进行期间,哈希表元素的删除、查找、更新等操作都会在这两个哈希表进行。
比如,查找一个 key 的值的话,先会在「哈希表 1」 里面进行查找,如果没找到,就会继续到哈希表 2 里面进行找到。
另外,在渐进式 rehash 进行期间,新增一个 key-value 时,会被保存到「哈希表 2 」里面,而「哈希表 1」 则不再进行任何添加操作,这样保证了「哈希表 1 」的 key-value 数量只会减少,随着 rehash 操作的完成,最终「哈希表 1 」就会变成空表。
ReHash触发条件
介绍了 rehash 那么多,还没说什么时情况下会触发 rehash 操作呢?
rehash 的触发条件跟负载因子(load factor)有关系。
负载因子可以通过下面这个公式计算:
触发 rehash 操作的条件,主要有两个:
- 当负载因子大于等于 1 ,并且 Redis 没有在执行 bgsave 命令或者 bgrewiteaof 命令,也就是没有执行 RDB 快照或没有进行 AOF 重写的时候,就会进行 rehash 操作。
- 当负载因子大于等于 5 时,此时说明哈希冲突非常严重了,不管有没有有在执行 RDB 快照或 AOF 重写,都会强制进行 rehash 操作。
服务端哈希扩容
在讲扩容源码之前,我们先来回顾之前讲的内容
// dictEntry的实现很简单,就是个链表
struct dictEntry {
void *key;
union {
void *val; // 其他
uint64_t u64; // 整数
int64_t s64; // 整数
double d; // 双精度数值
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. 指向下一个hash值相同的entry*/
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
};
struct dict {
dictType *type;
dictEntry **ht_table[2]; // 第二个表用来进行rehash
/**
* dictEntry **ht_table[2]; <==>
* typedef struct dictht{
* dictEntry **table;
* }dictht;
* ditcht ht[2];
**/
unsigned long ht_used[2];
// 用于保证rehash安全,如果值为-1,那么不能执行rehash
long rehashidx;
// redis6之后新加入,如果值>0则暂停rehash,保证rehash安全
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
// 哈希的内存指数
signed char ht_size_exp[2];
// 由dictType的dictEntryBytes定义的大小的任意字节数(从指针对齐的地址开始)。
void *metadata[];
};
说到这里,我们来看看扩容的源码,先来个宏定义
#define dictIsRehashing(d) ((d)->rehashidx != -1)
再来一个获取新内存大小的源码
static signed char _dictNextExp(unsigned long size)
{
unsigned char e = DICT_HT_INITIAL_EXP;
if (size >= LONG_MAX) return (8*sizeof(long)-1);
// 这里采用的是用移位来计算,保证效率
while(1) {
if (((unsigned long)1<<e) >= size)
return e;
e++;
}
}
一个判断是否运行扩容的"私有"函数
/* ------------------------- private functions ------------------------------ */
/* Because we may need to allocate huge memory chunk at once when dict
* expands, we will check this allocation is allowed or not if the dict
* type has expandAllowed member function. */
static int dictTypeExpandAllowed(dict *d) {
// 不允许返回1
if (d->type->expandAllowed == NULL) return 1;
// 这里的计算我们后面讲
return d->type->expandAllowed(
DICTHT_SIZE(_dictNextExp(d->ht_used[0] + 1)) * sizeof(dictEntry*),
(double)d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]));
}
下面是扩容源码
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
// 如果正在rehash,那么直接返回Ok
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
// 如果哈希表是空的,那么就创建
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
// 如果不被允许扩展就返回ok
if (!dictTypeExpandAllowed(d))
return DICT_OK;
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio))
{
return dictExpand(d, d->ht_used[0] + 1); // +1是为了避免和当前分配相同的内存
}
// 分配成功
return DICT_OK;
}
/* Expand or create the hash table,
* when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).
* Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht_used[0] > size) // 判断是否能够哈希,并且传入的size合法
return DICT_ERR;
/* the new hash table */
dictEntry **new_ht_table; // 创建一个新的哈希表
unsigned long new_ht_used;
signed char new_ht_size_exp = _dictNextExp(size); // 计算log(新哈希表的长度)
/* Detect overflows */
size_t newsize = 1ul<<new_ht_size_exp; // 计算长度
if (newsize < size || newsize * sizeof(dictEntry*) < newsize) // 如果比原来的还小
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR; // 如果和原来相同,没用
/* Allocate the new hash table and initialize all pointers to NULL */ // 分配内存,并且memset all为NULL
if (malloc_failed) {
new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
*malloc_failed = new_ht_table == NULL; // 判断是否分配成功
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*));
new_ht_used = 0; // 新的表还没有使用过
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht_table[0] == NULL) { // 如果是第一次次创建,使用第一张表为哈希表
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
// 使用第二张表进行rehash
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
// 表示当前表正在进行哈希中,后续不可以进行rehash,具体的判断在前面的宏定义内
d->rehashidx = 0;
return DICT_OK;
}