Redis扩容,缩容与rehash机制
扩容的触发时机和条件
从ADD的源码看起,主要代码如下:
/*用户自定义了是否允许扩容的检测函数*/
static int dictTypeExpandAllowed(dict *d) {
if (d->type->expandAllowed == NULL) return 1;
return d->type->expandAllowed(
DICTHT_SIZE(_dictNextExp(d->ht_used[0] + 1)) * sizeof(dictEntry*),
(double)d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]));
}
/* 检测是否需要扩容 */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
/* 正在rehsah 直接返回 */
if (dictIsRehashing(d)) return DICT_OK;
/* 如果table是空的 则进行扩容初始化它 */
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* 检测是否需要扩容 */
if (d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0]) &&
(dict_can_resize ||
d->ht_used[0]/ DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht_used[0] + 1);
}
return DICT_OK;
}
/* 检测是否需要扩容和检查该key是否已存在 */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* 是否需要扩容 */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
for (table = 0; table <= 1; table++) {
/* 计算下标 */
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
/* 检查该key是否已存在 */
he = d->ht_table[table][idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
/* 如果没有在进行rehash 则不需要检测另外一张哈希表 直接return */
if (!dictIsRehashing(d)) break;
}
return idx;
}
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
int htidx;
/* 如果在rehash 辅助进行一次rehash */
if (dictIsRehashing(d)) _dictRehashStep(d);
/* 检测是否需要扩容和检查该key是否已存在 */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* 根据是否在rehash 决定写入哪张表 rehashing在写入表1 新的数据都写入表1 需要rehash的Key才会越来越少 */
htidx = dictIsRehashing(d) ? 1 : 0;
size_t metasize = dictMetadataSize(d);
entry = zmalloc(sizeof(*entry) + metasize);
if (metasize > 0) {
memset(dictMetadata(entry), 0, metasize);
}
/* 头插 把新的entry写入哈希桶中 */
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
以上就是ADD 一个KV对的大概过程,总结:
触发扩容的时机: 写入一个KV对时,会检测是否需要扩容和该key是否存在
触发的条件:
1,第一次写入KV对时,因为在redis的哈希表采用的是延迟初始化,只有在第一次写入数据时才会真正去申请内存。
2,需要满足以下条件:
a,哈希表的节点数 >= 哈希表的桶数(数组的长度)
b,dict_can_resize == 1 (在生成新的RDB文件或者重写AOF的时候 dict_can_resize = 0)或者 节点数/哈希桶数 > 安全阈值(默认是5)
c,如果dict 的 expandAllowed成员函数变量不为空,则执行该函数来判断是否允许进行rehash。(这个是新版redis加的,应该是允许用户扩展增加自定义检查是否允许进行rehash,避免可能一次性申请内存过大导致触发redis满容且设置了过期策略时大量淘汰key等情况)
扩容的过程
redis的扩容并不是一次性把所有的KV对拷贝到另外一张表,而是选择了渐进式rehash。
rehash初始化
1,检测是否在rehash 或者 当前节点数 > size
2,计算扩容后的长度
3,检测防止溢出
4,比较扩容后的长度和原来的长度是否一致
5,申请内存
6,判断是初始化还是需要进行扩容,如果是第一次初始化,则只需要初始化table1 ; 如果是扩容,则初始化table2。
注意:最新版的redis扩容后的长度并不是之前的直接原来的长度的两倍,而是当前size(原来的size+1)的最小的2的幂次方
/* 计算长度 规则如下:
* 1,size >= LONG_MAX,则是 long对象所占字节长度 * 8 - 1
* 2,大于size的最小2的幂次方
* >> 代表位运算 左移几位 则代表是 2 的几次方,例如 1 >> 2 = 1 * 2 * 2
*/
static signed char _dictNextExp(unsigned long size)
{
unsigned char e = DICT_HT_INITIAL_EXP;
if (size >= LONG_MAX) return (8*sizeof(long)-1);
while(1) {
if (((unsigned long)1<<e) >= size)
return e;
e++;
}
}
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* 是否在rehash 或者 当前节点数 > size */
if (dictIsRehashing(d) || d->ht_used[0] > size)
return DICT_ERR;
dictEntry **new_ht_table;
unsigned long new_ht_used;
/* 计算长度 */
signed char new_ht_size_exp = _dictNextExp(size);
/* 防止溢出 */
size_t newsize = 1ul<<new_ht_size_exp;
if (newsize < size || newsize * sizeof(dictEntry*) < newsize)
return DICT_ERR;
/* 扩容后的长度和原来的长度一致则扩容无效 */
if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;
/* 申请内存 */
if (malloc_failed) {
new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
*malloc_failed = new_ht_table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*));
new_ht_used = 0;
/* 如果是第一次初始化而不是扩容,则只需要初始化table1即可 */
if (d->ht_table[0] == NULL) {
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
return DICT_OK;
}
/* 初始化table2 用于进行rehash操作 */
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0;
return DICT_OK;
}
操作辅助进行rehash
在 dictFind,dictAddRaw等每一次CRUD命令执行时,都会判断当前是否是在进行rehash,如果是,则帮助进行一次rehash操作
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
/* 辅助进行一次rehash操作 */
if (dictIsRehashing(d)) _dictRehashStep(d);
....
return NULL;
}
static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}
定时进行rehash
redis会有一个定时任务,定时遍历所有的db的哈希表,进行rehash操作,每次操作的时间上限目前是 1 毫秒,循环操作,一次循环操作100个key
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
int dictRehashMilliseconds(dict *d, int ms) {
if (d->pauserehash > 0) return 0;
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
rehash结束
rehash结束后会回收掉table0的内存, 然后把table1的地址赋值给table0
int dictRehash(dict *d, int n) {
/* 主要代码在 dict.c 文件中*/
/* rehash完成 回收table0的内存空间 把table1的地址赋值给table0 table1赋null */
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0;
}
/* 继续rehash */
return 1;
}
static void _dictReset(dict *d, int htidx)
{
d->ht_table[htidx] = NULL;
d->ht_size_exp[htidx] = -1;
d->ht_used[htidx] = 0;
}
总结
过程大概如下:
1,初始化table1,rehashidx 置为0,rehash开始
2,每次CRUD操作都会辅助进行一次rehash
3,定时任务遍历16个DB,执行rehash操作,while循环,每次循环rehash100个,一次循环时间上限时一毫秒
4,在这期间,所有新增都是写table1里,查,删,改则同时在两个表进行
5,rehash结束后会回收掉table1的内存, 然后把table1的地址赋值给table0,rehashidx置为-1,至此本次rehash就完成了
缩容的触发时机和条件
缩容的检查是在定时任务里的
void databasesCron(void) {
/* ..... */
/* 没有活跃的子进程在数据落盘的时候再去进行rehash操作 避免大量内存的写时复制 */
if (!hasActiveChildProcess()) {
/* 全局变量 以便在下次cron次能从对应的位置继续迭代 */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* 缩容 */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* 进行扩容 */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict); // DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])
used = dictSize(dict); // (d)->ht_used[0]+(d)->ht_used[1]
// HASHTABLE_MIN_FILL:10,DICT_HT_INITIAL_SIZE:1<<2
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
触发的时机:在cron里检测并触发
触发的条件:两张表的size加起来 > 4 而且 哈希表保存的key数量与哈希表的大小的比例小于10%
缩容过程和扩容过程一样的,只是一个是哈希桶数变大了,一个是变小了,扩容和缩容并不会同时进行,因为它们最终都是调 _dictExpand 方法,该方法有判断了当前是否在进行rehashif (dictIsRehashing(d) || d->ht_used[0] > size) return DICT_ERR;
本作品采用《CC 协议》,转载必须注明作者和本文链接