(三十六)— Redis中的11大优秀设计
最后更新于:2022-04-01 20:21:33
坚持了一个月左右的时间,从最开始的对Redis的代码做分类,从struct结构体分析开始,到最后分析main主程序结束,中间,各大模块的代码逐个击破,学习,总之,收获了非常多,好久没有这么久的耐心把一个框架学透,学习一个框架,会用那只是小小的一部分,能把背后的原理吃透才是真功夫。在这个学习的最后阶段,是时候要来点干货了,我把这1个多月来的一些总结的一些比较好的代码,和设计思想总结出来了,原本想凑成10大精彩设计的,可后来感觉每个点都挺精彩的,还是做成了11大优秀设计,包证让你打开研究,这里无关语言,重在一种编程的思想和设计,希望大家能好好领会。(下面的排序无关紧要,我只是按照时间顺序下来。后面的链接为我写的相关文章,如果想具体了解,请点击请入)
1.hyperloglog基量统计算法的实现([http://blog.csdn.net/androidlushangderen/article/details/40683763](http://blog.csdn.net/androidlushangderen/article/details/40683763))。说到这个,比较搞笑的一点是,我刚刚开始竟然以为是某种类型的日志,和slowLog一样,后来才明白,这是一种基量统计算法,类似的算法还有LLC,HLLC是他的升级版本。
2.zmalloc内存分配的重新实现([http://blog.csdn.net/androidlushangderen/article/details/40659331]())。Redis的作者在内存分配上显然是早有准备,不会傻傻的还是调用系统的mallo和free方法,人家在这里做了一个小小的封装,便于管理者更方便的控制系统的内存,下面是一个小小的结构体的声明,看到这个大家估计会明白。
~~~
/* 调用zmalloc申请size个大小的空间 */
void *zmalloc(size_t size) {
//实际调用的还是malloc函数
void *ptr = malloc(size+PREFIX_SIZE);
//如果申请的结果为null,说明发生了oom,调用oom的处理方法
if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
//更新used_memory的大小
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}
~~~
3.multi事务操作([http://blog.csdn.net/androidlushangderen/article/details/40392209]())。Redis中的事务操作给我一种焕然一新的感觉,作者在做此设计的时候,用到了key,和watch key的概念,一个key维护了一个所有watch他的所有Client列表,一个Client自身也拥有一个他所监视的所有key,如果一个key被touch了,所有同样见识此key的客户端的下一步操作统统失效,具体怎么实现,请猛点后面的链接。
4.redis-benchmark性能测试([http://blog.csdn.net/androidlushangderen/article/details/40211907]())。Redis在这里出现了一个性能统计的概念,比较高大上的感觉,与调用了很多latency延时类的方法,就是判断延时的情况来看性能的好坏的。
5.zipmap压缩结构的设计([http://blog.csdn.net/androidlushangderen/article/details/39994599]())。Redis在内存处理上可谓是想尽了办法,ziplist压缩列表和zipmap压缩图就是非常典型的设计。与往常的结构体内直接放一个int64类型的整形变量,这样就占了8个字节,但是一般情况下,我们保存的数值都比较小,1个字节差不多就够了,所有就浪费了7个字节,所以zip压缩系列结构体,就可以动态分配字节应对不同的情况,这个设计非常精彩,要确定这个key-value 的位置,通过前面保留的长度做偏移量的定位。
6.sparkline微线图的重新设计([http://blog.csdn.net/androidlushangderen/article/details/39964591]())。Redis的sparkline的出现应该又是帮我扫盲了,人家可以用字符串的形式输出一张类似折线图的表,利用了采集的很多歌Sample的样本点,这个类多用于分析统计中出现,比如latency.c延时分析类中用到了。
7.对象引用计数实现内存管理([http://blog.csdn.net/androidlushangderen/article/details/40716469]())。我们知道管理对象的生命周期一般有2种方法,1个是根搜索法(JVM中用的就是这个),另一个就是引用计数法,而Redis就给我们对此方法的实现,下面是对象增引用和减少引用的实现:
~~~
/* robj对象增减引用计数,递增robj中的refcount的值 */
void incrRefCount(robj *o) {
//递增robj中的refcount的值
o->refcount++;
}
~~~
~~~
/* 递减robj中的引用计数,引用到0后,释放对象 */
void decrRefCount(robj *o) {
//如果之前的引用计数已经<=0了,说明出现异常情况了
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
if (o->refcount == 1) {
//如果之前的引用计数为1,再递减一次,恰好内有被任何对象引用了,所以就可以释放对象了
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
zfree(o);
} else {
//其他对于>1的引用计数的情况,只需要按常规的递减引用计数即可
o->refcount--;
}
}
~~~
减少引用的方法实现是重点。
8.fork子进程实现后台程序([http://blog.csdn.net/androidlushangderen/article/details/40266579]())。fork创建子线程实现后台程序的操作,我还是第一次见能这么用的,以前完全不知道fork能怎么使用的,这次真的是涨知识了。里面关键的一点是fork方法在子线程和父线程中的返回值不同做处理,父线程返回子线程的PID号,在子线程中返回的是0.
~~~
/* 后台进行rbd保存操作 */
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.rdb_child_pid != -1) return REDIS_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
start = ustime();
//利用fork()创建子进程用来实现rdb的保存操作
//此时有2个进程在执行这段函数的代码,在子进行程返回的pid为0,
//所以会执行下面的代码,在父进程中返回的代码为孩子的pid,不为0,所以执行else分支的代码
//在父进程中放返回-1代表创建子进程失败
if ((childpid = fork()) == 0) {
//在这个if判断的代码就是在子线程中后执行的操作
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
//这个就是刚刚说的rdbSave()操作
retval = rdbSave(filename);
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
//执行父线程的后续操作
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
~~~
9.long long 类型转为String类型方法([http://blog.csdn.net/androidlushangderen/article/details/40649623](http://blog.csdn.net/androidlushangderen/article/details/40649623))。以前做过很多字符串转数值和数值转字符串的算法实现,也许你的功能是实现了,但是效率呢,但面对的是非常长的long long类型的数字时,效率可能会更低。Redis在这里给我们提供了一个很好的思路,平时我们/10的计算,再%1o求余数,人家直接来了个/100的,然后直接通过字符串数组和余数值直接的映射,进行计算。算法如下;
~~~
/* Convert a long long into a string. Returns the number of
* characters needed to represent the number.
* If the buffer is not big enough to store the string, 0 is returned.
*
* Based on the following article (that apparently does not provide a
* novel approach but only publicizes an already used technique):
*
* https://www.facebook.com/notes/facebook-engineering/three-optimization-tips-for-c/10151361643253920
*
* Modified in order to handle signed integers since the original code was
* designed for unsigned integers. */
/* long long类型转化为string类型 */
int ll2string(char* dst, size_t dstlen, long long svalue) {
static const char digits[201] =
"0001020304050607080910111213141516171819"
"2021222324252627282930313233343536373839"
"4041424344454647484950515253545556575859"
"6061626364656667686970717273747576777879"
"8081828384858687888990919293949596979899";
int negative;
unsigned long long value;
/* The main loop works with 64bit unsigned integers for simplicity, so
* we convert the number here and remember if it is negative. */
/* 在这里做正负号的判断处理 */
if (svalue < 0) {
if (svalue != LLONG_MIN) {
value = -svalue;
} else {
value = ((unsigned long long) LLONG_MAX)+1;
}
negative = 1;
} else {
value = svalue;
negative = 0;
}
/* Check length. */
uint32_t const length = digits10(value)+negative;
if (length >= dstlen) return 0;
/* Null term. */
uint32_t next = length;
dst[next] = '\0';
next--;
while (value >= 100) {
//做值的换算
int const i = (value % 100) * 2;
value /= 100;
//i所代表的余数值用digits字符数组中的对应数字代替了
dst[next] = digits[i + 1];
dst[next - 1] = digits[i];
next -= 2;
}
/* Handle last 1-2 digits. */
if (value < 10) {
dst[next] = '0' + (uint32_t) value;
} else {
int i = (uint32_t) value * 2;
dst[next] = digits[i + 1];
dst[next - 1] = digits[i];
}
/* Add sign. */
if (negative) dst[0] = '-';
return length;
}
~~~
10.正则表达式的实现算法([http://blog.csdn.net/androidlushangderen/article/details/40649623](http://blog.csdn.net/androidlushangderen/article/details/40649623))。正则表达式在我们平时用的可是非常多的,可有多少知道,正则表达式是如何实现通过简单的模式进程匹配,背后的原理实现到底怎么样呢,为什么?就可以代表任何一个字符接着往后匹配,*代表的是所有字符,要实现这样一个算法,也不是那么容易的哦,Redis就实现了这么一个算法,算是捡到宝了吧。
~~~
/* Glob-style pattern matching. */
/*支持glob-style的通配符格式,如*表示任意一个或多个字符,?表示任意字符,[abc]表示方括号中任意一个字母。*/
int stringmatchlen(const char *pattern, int patternLen,
const char *string, int stringLen, int nocase)
{
while(patternLen) {
switch(pattern[0]) {
case '*':
while (pattern[1] == '*') {
//如果出现的是**,说明一定匹配
pattern++;
patternLen--;
}
if (patternLen == 1)
return 1; /* match */
while(stringLen) {
if (stringmatchlen(pattern+1, patternLen-1,
string, stringLen, nocase))
return 1; /* match */
string++;
stringLen--;
}
return 0; /* no match */
break;
case '?':
if (stringLen == 0)
return 0; /* no match */
/* 因为?能代表任何字符,所以,匹配的字符再往后挪一个字符 */
string++;
stringLen--;
break;
case '[':
{
int not, match;
pattern++;
patternLen--;
not = pattern[0] == '^';
if (not) {
pattern++;
patternLen--;
}
match = 0;
while(1) {
if (pattern[0] == '\\') {
//如果遇到转义符,则模式字符往后移一个位置
pattern++;
patternLen--;
if (pattern[0] == string[0])
match = 1;
} else if (pattern[0] == ']') {
//直到遇到另外一个我中括号,则停止
break;
} else if (patternLen == 0) {
pattern--;
patternLen++;
break;
} else if (pattern[1] == '-' && patternLen >= 3) {
int start = pattern[0];
int end = pattern[2];
int c = string[0];
if (start > end) {
int t = start;
start = end;
end = t;
}
if (nocase) {
start = tolower(start);
end = tolower(end);
c = tolower(c);
}
pattern += 2;
patternLen -= 2;
if (c >= start && c <= end)
match = 1;
} else {
if (!nocase) {
if (pattern[0] == string[0])
match = 1;
} else {
if (tolower((int)pattern[0]) == tolower((int)string[0]))
match = 1;
}
}
pattern++;
patternLen--;
}
if (not)
match = !match;
if (!match)
return 0; /* no match */
string++;
stringLen--;
break;
}
case '\\':
if (patternLen >= 2) {
pattern++;
patternLen--;
}
/* fall through */
default:
/* 如果没有正则表达式的关键字符,则直接比较 */
if (!nocase) {
if (pattern[0] != string[0])
//不相等,直接不匹配
return 0; /* no match */
} else {
if (tolower((int)pattern[0]) != tolower((int)string[0]))
return 0; /* no match */
}
string++;
stringLen--;
break;
}
pattern++;
patternLen--;
if (stringLen == 0) {
while(*pattern == '*') {
pattern++;
patternLen--;
}
break;
}
}
if (patternLen == 0 && stringLen == 0)
//如果匹配字符和模式字符匹配的长度都减少到0了,说明匹配成功了
return 1;
return 0;
}
~~~
11.Redis的drand48()随机算法重实现([http://blog.csdn.net/androidlushangderen/article/details/40582189](http://blog.csdn.net/androidlushangderen/article/details/40582189))。Redis随机算法的实现作为11大设计的最后一个,并不是说这个设计相比前面有多么的烂,因为我觉得比较有特点,我就追加了一个上去。由于Redis的作者考虑到随机算法的在不同的操作系统可能会表现出不同的特性,所以不建议采用math.rand()方法,而是基于drand48()的算法重新实现了一个。具体什么叫drand48().请猛点链接处。
好了,以上就是我印象中的Redis中比较优秀的设计。其实在Redis的很多还有很多优秀代码的痕迹,由于篇幅有限,等待着读者自己去学习,发现。
';
(三十五)— redis.c服务端的实现分析(2)
最后更新于:2022-04-01 20:21:30
在Redis服务端的代码量真的是比较大,如果一个一个API的学习怎么实现,无疑是一种效率很低的做法,所以我今天对服务端的实现代码的学习,重在他的执行流程上,而对于他的模块设计在上一篇中我已经分析过了,不明白的同学可以接着看上篇。所以我学习分析redis服务端的实现也是主要从main函数开始。在分析main执行流程之前,Redis的作者在这里声明了几个变量,这个我们有必要知道一下。
~~~
/* Our shared "common" objects */
/* 共享的对象 */
struct sharedObjectsStruct shared;
/* Global vars that are actually used as constants. The following double
* values are used for double on-disk serialization, and are initialized
* at runtime to avoid strange compiler optimizations. */
/* 全局的double类型常量 */
double R_Zero, R_PosInf, R_NegInf, R_Nan;
/*================================= Globals ================================= */
/* Global vars */
/* 全局的RedisServer */
struct redisServer server; /* server global state */
/* Our command table.
*
* Every entry is composed of the following fields:
*
* name: a string representing the command name.
* function: pointer to the C function implementing the command.
* arity: number of arguments, it is possible to use -N to say >= N
* sflags: command flags as string. See below for a table of flags.
* flags: flags as bitmask. Computed by Redis using the 'sflags' field.
* get_keys_proc: an optional function to get key arguments from a command.
* This is only used when the following three fields are not
* enough to specify what arguments are keys.
* first_key_index: first argument that is a key
* last_key_index: last argument that is a key
* key_step: step to get all the keys from first to last argument. For instance
* in MSET the step is two since arguments are key,val,key,val,...
* microseconds: microseconds of total execution time for this command.
* calls: total number of calls of this command.
*
* The flags, microseconds and calls fields are computed by Redis and should
* always be set to zero.
*
* Command flags are expressed using strings where every character represents
* a flag. Later the populateCommandTable() function will take care of
* populating the real 'flags' field using this characters.
*
* This is the meaning of the flags:
*
* w: write command (may modify the key space).
* r: read command (will never modify the key space).
* m: may increase memory usage once called. Don't allow if out of memory.
* a: admin command, like SAVE or SHUTDOWN.
* p: Pub/Sub related command.
* f: force replication of this command, regardless of server.dirty.
* s: command not allowed in scripts.
* R: random command. Command is not deterministic, that is, the same command
* with the same arguments, with the same key space, may have different
* results. For instance SPOP and RANDOMKEY are two random commands.
* S: Sort command output array if called from script, so that the output
* is deterministic.
* l: Allow command while loading the database.
* t: Allow command while a slave has stale data but is not allowed to
* server this data. Normally no command is accepted in this condition
* but just a few.
* M: Do not automatically propagate the command on MONITOR.
* F: Fast command: O(1) or O(log(N)) command that should never delay
* its execution as long as the kernel scheduler is giving us time.
* Note that commands that may trigger a DEL as a side effect (like SET)
* are not fast commands.
*/
/* redis命令表格对应关系 */
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
.....
~~~
这个命令表相当多,省略了,基本是囊括了所有的可能命令。毕竟服务端都是以上这些命令的响应实现嘛。下面是重点要学习的了,在服务端的执行主程序中,是如何执行的呢,来一个流程框图:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-08-24_57bd74f7a6168.jpg)
具体的代码实现为如下:
~~~
int main(int argc, char **argv) {
struct timeval tv;
/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
//启用线程安全模式
zmalloc_enable_thread_safeness();
//启用当发生内存溢出时的handler方法
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
//获取当前时间
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
server.sentinel_mode = checkForSentinelMode(argc,argv);
//初始化服务端的配置
initServerConfig();
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
//初始化服务端的模式
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
if (argc >= 2) {
int j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;
/* Handle special options --help and --version */
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* First argument is the config file name? */
if (argv[j][0] != '-' || argv[j][1] != '-')
configfile = argv[j++];
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
if (server.sentinel_mode && configfile && *configfile == '-') {
redisLog(REDIS_WARNING,
"Sentinel config from STDIN not allowed.");
redisLog(REDIS_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
if (configfile) server.configfile = getAbsolutePath(configfile);
resetServerSaveParams();
//加载服务端的配置,根据config配置文件来加载
loadServerConfig(configfile,options);
sdsfree(options);
} else {
redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
//是否开启守护进程
if (server.daemonize) daemonize();
initServer();
if (server.daemonize) createPidFile();
redisSetProcTitle(argv[0]);
redisAsciiArt();
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
linuxOvercommitMemoryWarning();
#endif
loadDataFromDisk();
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
if (server.sofd > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
sentinelIsRunning();
}
/* Warning the user about suspicious maxmemory setting. */
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
//事件加载之前调用的beforeSleep方法
aeSetBeforeSleepProc(server.el,beforeSleep);
//开启事件驱动循环
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
~~~
方法非常简单命令,有人估计比较纳闷了,为什么没有连接操作呢,Client和Server不是要有连接操作的嘛,在这里为什么会没有呢,因为那些是客户端的主动进行的操作,所以服务端的main操作相对简单很多。
';
(三十四)— redis.h服务端的实现分析(1)
最后更新于:2022-04-01 20:21:28
上次刚刚分析过了客户端的结构体分析,思路比较简答,清晰,最后学习的是服务端的实现,服务端在Redis可是重中之重,里面基本上囊括了之前模块中涉及到的所有知识点,从redis的头文件就可以看出了,redis.h代码量就已经破1000+行了,而且都还只是一些变量,宏定义的声明,和一些方法原型的声明。所以,今天的总结跟昨天一样,先不做具体的实现学习,先从全局的角度思考,服务端的整体设计思路,这从头文件的声明正好可以学习。
~~~
/* ----------------------- 声明了一下所需的头文件,主要为各种结构体的操作文件 -------------------- */
#include "ae.h" /* Event driven programming library 事件驱动库*/
#include "sds.h" /* Dynamic safe strings 动态字符串库 */
#include "dict.h" /* Hash tables 哈希字典 */
#include "adlist.h" /* Linked lists 普通双向链表 */
#include "zmalloc.h" /* total memory usage aware version of malloc/free 内存申请管理库 */
#include "anet.h" /* Networking the easy way 网络操作库 */
#include "ziplist.h" /* Compact list data structure 压缩列表 */
#include "intset.h" /* Compact integer set structure 整形set结构体 */
#include "version.h" /* Version macro 版本号文件*/
#include "util.h" /* Misc functions useful in many places 同样方法类*/
#include "latency.h" /* Latency monitor API 延时监视方法 */
#include "sparkline.h" /* ASII graphs API 微线图库 */
/* -----------------------------根据模块的不同,宏定义了不同的变量 ---------------- */
/* 1.Error codes Redis错误码*/
/* 2.Static server configuration server中的一些静态变量值*/
/* 3.Protocol and I/O related defines 协议和I/O相关变量的定义*/
/* 4.Hash table parameters 哈希表的参数*/
/* 5.Command flags 命令行操作的flag定义*/
/* 6.Object types Object的类型,包括List,String,Hash等*/
/* 7.Objects encoding Object的编码类型*/
/* 8.Defines related to the dump file format RDB的保存格式,14位,32位等*/
/* 9.AOF states AOF文件的状态*/
/* 10.Client flags 客户端的flag标示*/
/* 11.Client request types 客户端的请求类型,INLINE和MULTIBULK*/
/* 12.Client classes for client limits 客户端的类型*/
/* 13.Slave replication state replication状态*/
/* 14.List related stuff 列表位置,head或tail*/
/* 15.Sort operations 排序操作类型,升序或是降序等等*/
/* 16.Log levels 日志级别*/
/* 17.Anti-warning macro... 警告信息*/
/* 18.Append only defines 追加操作变量*/
/* 19.Zip structure related defaults ziplist压缩列表变量*/
/* 20.HyperLogLog defines HLLC的变量定义*/
/* 21.Sets operations codes 设置操作的操作码*/
/* 22.Redis maxmemory strategies Redis内存操作策略*/
/* 23.Scripting */
/* 24.Units 时间单位,微妙和毫秒*/
/* 25.SHUTDOWN flags */
/* 26.Command call flags, see call() function */
/* 27.Command propagation flags, see propagate() function */
/* 28.Keyspace changes notification classes. 通知类型*/
/*-----------------------------------------------------------------------------
* Data types 数据类型的相关定义
*----------------------------------------------------------------------------*/
1.typedef struct redisObject /* Redis Object对象 */
2.typedef struct redisDb
3.typedef struct multiCmd
4.typedef struct multiState
5.typedef struct blockingState
6.typedef struct readyList
7.typedef struct redisClient /* Redis客户端结构体 */
8.struct saveparam
9.struct sharedObjectsStruct
10.typedef struct zskiplistNode
11.typedef struct zskiplist
12.typedef struct zset
13.typedef struct clientBufferLimitsConfig
14.typedef struct redisOp
15.typedef struct redisOpArray
16.struct redisServer /* Redis服务端结构体的定义 */
17.struct redisCommand /* Redis服务端Command命令结构体的定义 */
/*-----------------------------------------------------------------------------
* Functions prototypes 方法原型
*----------------------------------------------------------------------------*/
/* 1.Utils 通用类的方法*/
/* 2.networking.c -- Networking and Client related operations 网络操作类方法*/
/* 3.List data type 列表操作方法*/
/* 4.MULTI/EXEC/WATCH... 命令执行方法*/
/* 5.Redis object implementation Redis Object对象方法*/
/* 6.Synchronous I/O with timeout I/O同步类方法*/
/* 7.Replication 主从复制方法*/
/* 8.Generic persistence functions 持久化加载的一些方法*/
/* 9.AOF persistence AOF日志文件持久化方法*/
/* 10.Core functions 核心类方法*/
/* 11.Sorted sets data type 排序set集合方法*/
/* 12.Set data type set类型数据操作方法*/
/* 13.Hash data type 哈希类型方法操作方法*/
/* 14.Pub / Sub 发布订阅方法*/
/* 15.Keyspace events notification ketSpace事件通知方法*/
/* 16.Configuration 配置类方法*/
/* 17.db.c -- Keyspace access API db相关的方法*/
/* 18.Sentinel */
/* 19.Scripting */
/* 20.Git SHA1 */
/* 21.Commands prototypes 命令原型方法*/
~~~
主要4个大模块
**1.引用头文件声明**
**2.宏定义变量定义**
**3.数据结构体的声明**
**4.方法原型声明**
在这里特别提出,在 代码中遍地出现的RedisObject,RedisClient,RedisServer的结构定义,都是在这个文件中定义的。
~~~
/* The actual Redis Object */
#define REDIS_LRU_BITS 24
#define REDIS_LRU_CLOCK_MAX ((1<lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
int refcount;
void *ptr;
} robj;
~~~
RedisClient,RedisServer的结构定义非常类似,里面包含了一堆的属性,长长的一排下来。
';
(三十三)— redis-cli.c客户端命令行接口的实现(2)
最后更新于:2022-04-01 20:21:26
今天学习完了命令行客户端的后续内容,整体感觉就是围绕着2个东西转,config和mode。为什么我会这么说呢,请继续往下看,客户端中的配置结构体和之前我们所学习的配置结构体,不是指的同一个概念,cli中的结构体除了基本的ip,Port端口号,还有就是各种mode的配置了。
~~~
/* Redis配置结构体 */
static struct config {
char *hostip;
int hostport;
char *hostsocket;
long repeat;
long interval;
int dbnum;
int interactive;
int shutdown;
int monitor_mode;
int pubsub_mode;
int latency_mode;
int latency_history;
int cluster_mode;
int cluster_reissue_command;
int slave_mode;
int pipe_mode;
int pipe_timeout;
int getrdb_mode;
int stat_mode;
int scan_mode;
int intrinsic_latency_mode;
int intrinsic_latency_duration;
char *pattern;
char *rdb_filename;
int bigkeys;
int stdinarg; /* get last arg from stdin. (-x option) */
char *auth;
int output; /* output mode, see OUTPUT_* defines */
sds mb_delim;
char prompt[128];
char *eval;
int last_cmd_type;
} config;
~~~
里面少说也有10个mode模式了吧。我们先倒过来,看看cli的主程序运行的流程,也就是main函数的执行步骤:
~~~
/*main函数主程序操作*/
int main(int argc, char **argv) {
int firstarg;
//首先初始化客户端配置操作
config.hostip = sdsnew("127.0.0.1");
config.hostport = 6379;
config.hostsocket = NULL;
config.repeat = 1;
config.interval = 0;
config.dbnum = 0;
config.interactive = 0;
config.shutdown = 0;
config.monitor_mode = 0;
config.pubsub_mode = 0;
config.latency_mode = 0;
config.latency_history = 0;
config.cluster_mode = 0;
config.slave_mode = 0;
config.getrdb_mode = 0;
config.stat_mode = 0;
config.scan_mode = 0;
config.intrinsic_latency_mode = 0;
config.pattern = NULL;
config.rdb_filename = NULL;
config.pipe_mode = 0;
config.pipe_timeout = REDIS_CLI_DEFAULT_PIPE_TIMEOUT;
config.bigkeys = 0;
config.stdinarg = 0;
config.auth = NULL;
config.eval = NULL;
config.last_cmd_type = -1;
if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL))
config.output = OUTPUT_RAW;
else
config.output = OUTPUT_STANDARD;
config.mb_delim = sdsnew("\n");
cliInitHelp();
//根据用户输入的参数,配置config
firstarg = parseOptions(argc,argv);
argc -= firstarg;
argv += firstarg;
//配置设置完毕,根据配置中的模式设置,调用相应的mode方法
/* Latency mode */
if (config.latency_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
latencyMode();
}
/* Slave mode */
if (config.slave_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
slaveMode();
}
....
~~~
后面的代码与此相同,所以就省略了,步骤简单来说,就是设置配置,根据配置启动相应的模式,下面说说,里面的主要几种模式
1.statMode:
~~~
/* statMode主要输出一些读取数据统计的一些信息 */
static void statMode(void) {
redisReply *reply;
long aux, requests = 0;
int i = 0;
while(1) {
char buf[64];
int j;
reply = reconnectingInfo();
if (reply->type == REDIS_REPLY_ERROR) {
printf("ERROR: %s\n", reply->str);
exit(1);
}
if ((i++ % 20) == 0) {
printf(
"------- data ------ --------------------- load -------------------- - child -\n"
"keys mem clients blocked requests connections \n");
}
/* Keys */
aux = 0;
for (j = 0; j < 20; j++) {
long k;
sprintf(buf,"db%d:keys",j);
k = getLongInfoField(reply->str,buf);
if (k == LONG_MIN) continue;
aux += k;
}
sprintf(buf,"%ld",aux);
printf("%-11s",buf);
/* Used memory */
aux = getLongInfoField(reply->str,"used_memory");
bytesToHuman(buf,aux);
printf("%-8s",buf);
/* Clients */
aux = getLongInfoField(reply->str,"connected_clients");
sprintf(buf,"%ld",aux);
printf(" %-8s",buf);
/* Blocked (BLPOPPING) Clients */
aux = getLongInfoField(reply->str,"blocked_clients");
sprintf(buf,"%ld",aux);
printf("%-8s",buf);
....
~~~
客户端当前的数据统计信息。
2.latencyMode中会用到的测试硬件计算性能的方法:
~~~
/* This is just some computation the compiler can't optimize out.
* Should run in less than 100-200 microseconds even using very
* slow hardware. Runs in less than 10 microseconds in modern HW. */
/* 普通的计算操作,测试硬件计算的速度快慢 */
unsigned long compute_something_fast(void) {
unsigned char s[256], i, j, t;
int count = 1000, k;
unsigned long output = 0;
for (k = 0; k < 256; k++) s[k] = k;
i = 0;
j = 0;
while(count--) {
i++;
j = j + s[i];
t = s[i];
s[i] = s[j];
s[j] = t;
output += s[(s[i]+s[j])&255];
}
return output;
}
~~~
帮助命令的输出文档是由下面的函数输出的:
~~~
/* 帮助命令的输出文档 */
static void usage(void) {
sds version = cliVersion();
fprintf(stderr,
"redis-cli %s\n"
"\n"
"Usage: redis-cli [OPTIONS] [cmd [arg [arg ...]]]\n"
" -h Server hostname (default: 127.0.0.1).\n"
" -p Server port (default: 6379).\n"
" -s Server socket (overrides hostname and port).\n"
" -a Password to use when connecting to the server.\n"
" -r Execute specified command N times.\n"
" -i When -r is used, waits seconds per command.\n"
" It is possible to specify sub-second times like -i 0.1.\n"
" -n Database number.\n"
" -x Read last argument from STDIN.\n"
" -d Multi-bulk delimiter in for raw formatting (default: \\n).\n"
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
" --raw Use raw formatting for replies (default when STDOUT is\n"
" not a tty).\n"
" --no-raw Force formatted output even when STDOUT is not a tty.\n"
" --csv Output in CSV format.\n"
" --latency Enter a special mode continuously sampling latency.\n"
" --latency-history Like --latency but tracking latency changes over time.\n"
" Default time interval is 15 sec. Change it using -i.\n"
" --slave Simulate a slave showing commands received from the master.\n"
" --rdb Transfer an RDB dump from remote server to local file.\n"
" --pipe Transfer raw Redis protocol from stdin to server.\n"
" --pipe-timeout In --pipe mode, abort with error if after sending all data.\n"
" no reply is received within seconds.\n"
" Default timeout: %d. Use 0 to wait forever.\n"
" --bigkeys Sample Redis keys looking for big keys.\n"
" --scan List all keys using the SCAN command.\n"
" --pattern Useful with --scan to specify a SCAN pattern.\n"
" --intrinsic-latency Run a test to measure intrinsic system latency.\n"
" The test will run for the specified amount of seconds.\n"
" --eval Send an EVAL command using the Lua script at .\n"
" --help Output this help and exit.\n"
" --version Output version and exit.\n"
"\n"
"Examples:\n"
" cat /etc/passwd | redis-cli -x set mypasswd\n"
" redis-cli get mypasswd\n"
" redis-cli -r 100 lpush mylist x\n"
" redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
" redis-cli --eval myscript.lua key1 key2 , arg1 arg2 arg3\n"
" redis-cli --scan --pattern '*:12345*'\n"
"\n"
" (Note: when using --eval the comma separates KEYS[] from ARGV[] items)\n"
"\n"
"When no command is given, redis-cli starts in interactive mode.\n"
"Type \"help\" in interactive mode for information on available commands.\n"
"\n",
version, REDIS_CLI_DEFAULT_PIPE_TIMEOUT);
sdsfree(version);
exit(1);
}
~~~
在命令里面,会由于2个概念,1个叫一般性的Command命令还有一个是CommandGroup命令组的概念,举个例子,比如list,set等经常会用到的一些命令,后面可以接好多种参数的命令,属性命令组命令,一般CONFIG GET,这种功能非常单一的命令我们就叫他为普通的命令,Dump,Exist啊等等这些命令都是普通的命令,CommandGroup的命令不是很多就下面这么几个:
~~~
/* 所有的命令组 */
static char *commandGroups[] = {
"generic",
"string",
"list",
"set",
"sorted_set",
"hash",
"pubsub",
"transactions",
"connection",
"server",
"scripting",
"hyperloglog"
};
~~~
也是最最常用的命令,在redis的系统中。
';
(三十二)— redis-cli.c客户端命令行接口的实现(1)
最后更新于:2022-04-01 20:21:24
分析了将近1个月的Redis源代码了,也到了最核心的,最贴近系统开始的地方了,我把他取名为main目录,里面就包括了2个重要的文件,redis-cli和redis文件,1个代表的是客户端的执行文件,1个代表的是服务端的文件,这2个也是整个系统最最核心的模块,所以我把他归结到main主程序模块了,主程序模块也是我学习Redis最后的一个模块,在这里一定能看到我之前看到的所有各个功能模块的痕迹。由于主模块中的代码量比较多,所以我选择了分批次的学习,所以今天我主要的是把里面的API方法给拉了出来,把redis-cli中的东西,做了个解剖。
刚开始以为redis-cli.是redis-client的缩写,但其实他的意思:
~~~
/* Redis CLI (command line interface)
* 命令行接口
~~~
所以说,他是直接响应操作redis下的黑窗口的命令的。在这个文件中,定义了redis的上下文,配置信息结构体和一堆的API,就大致这些了:
~~~
/* redis上下文 */
static redisContext *context;
/* Redis配置结构体 */
static struct config
~~~
主要是后面的各种API,先不急着看方法,因为方法真的是太多了,首先我们学会分类:下面是我做的一个图;
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-08-24_57bd74f78aa93.jpg)
模块还是挺多的吧上面都是英文的表现形式,在后续的分析中,我会逐个模块的做学习,比如第一个Utility functions就是通用方法的意思啦,还有后面的什么什么方法等。下面在放出redis-cli中的所有API:
~~~
/* Utility functions 通用方法 */
static long long ustime(void)
static long long mstime(void)
static void cliRefreshPrompt(void)
static sds cliVersion(void)
static void cliInitHelp(void)
static void cliOutputCommandHelp(struct commandHelp *help, int group)
static void cliOutputGenericHelp(void)
static void cliOutputHelp(int argc, char **argv)
static void completionCallback(const char *buf, linenoiseCompletions *lc)
static int cliAuth()
static int cliSelect()
static int cliConnect(int force)
static void cliPrintContextError(void)
static sds cliFormatReplyTTY(redisReply *r, char *prefix)
static sds cliFormatReplyRaw(redisReply *r)
static sds cliFormatReplyCSV(redisReply *r)
static int cliReadReply(int output_raw_strings)
static int cliSendCommand(int argc, char **argv, int repeat)
static redisReply *reconnectingInfo(void)
/* User interface 用户接口*/
static int parseOptions(int argc, char **argv)
static sds readArgFromStdin(void)
static void usage(void)
static char **convertToSds(int count, char**args)
static void repl(void)
static int noninteractive(int argc, char **argv)
/* Eval mode Eval模式*/
static int evalMode(int argc, char **argv)
/* Latency and latency history modes 延时模式*/
static void latencyMode(void)
/* Slave mode */
unsigned long long sendSync(int fd)
static void slaveMode(void)
/* RDB transfer mode */
static void getRDB(void)
/* Bulk import (pipe) mode */
static void pipeMode(void)
/* Find big keys */
static redisReply *sendScan(unsigned long long *it)
static int getDbSize(void)
static int toIntType(char *key, char *type)
static void getKeyTypes(redisReply *keys, int *types)
static void getKeySizes(redisReply *keys, int *types, unsigned long long *sizes)
static void findBigKeys(void)
/* Stats mode */
static char *getInfoField(char *info, char *field)
static long getLongInfoField(char *info, char *field)
void bytesToHuman(char *s, long long n)
static void statMode(void)
/* Scan mode */
static void scanMode(void)
/* Intrisic latency mode*/
unsigned long compute_something_fast(void)
static void intrinsicLatencyModeStop(int s)
static void intrinsicLatencyMode(void)
/* Program main() */
int main(int argc, char **argv)
~~~
';
(三十一)— latency延迟分析处理
最后更新于:2022-04-01 20:21:21
每当提到延时统计的时候,一定想到的一个名词就是”性能测试“,没错,在Redis的redis_benchmark文件中,的确用到了延迟文件中的相关信息。在Redis中的官方解释此文件:
~~~
/* The latency monitor allows to easily observe the sources of latency
* in a Redis instance using the LATENCY command. Different latency
* sources are monitored, like disk I/O, execution of commands, fork
* system call, and so forth.
*
* 延时监听器可以对Redis中很多简单的资源进行监听,比如I/O磁盘操作,执行一些指令,
* fork创建子线程操作等的监听。
* ----------------------------------------------------------------------------
~~~
在Redis中的延时操作中,整个过程原理非常简单,他是针对每种事件维护了一个统计列表,每个列表中包括了了采集的一系列样本,每个样本包括,此样本的创建时间和此样本的延时时间。event==》对SampleSeriesList 是一个字典的映射关系。下面看看,里面关键的采集点,名叫latencySample采集点的结构定义:
~~~
/* Representation of a latency sample: the sampling time and the latency
* observed in milliseconds. */
/* 延时样品例子 */
struct latencySample {
//延时Sample创建的时间
int32_t time; /* We don't use time_t to force 4 bytes usage everywhere. */
//延时的具体时间, 单位为毫秒
uint32_t latency; /* Latency in milliseconds. */
};
~~~
字典中维护的可不是一个Sample结点,而是一个结点列表结构体:
~~~
/* The latency time series for a given event. */
/* 针对某个事件采集的一系列延时sample */
struct latencyTimeSeries {
//下一个延时Sample的下标
int idx; /* Index of the next sample to store. */
//最大的延时
uint32_t max; /* Max latency observed for this event. */
//最近的延时记录
struct latencySample samples[LATENCY_TS_LEN]; /* Latest history. */
};
~~~
在Redis代码的设计中,因为延时是用来测试和结果分析的,所以,作者还设计了用于后面分析报告中会用到的数据统计结构体;
~~~
/* Latency statistics structure. */
/* 延时sample的数据统计结果结构体 */
struct latencyStats {
//绝对最高的延时时间
uint32_t all_time_high; /* Absolute max observed since latest reset. */
//平均Sample延时时间
uint32_t avg; /* Average of current samples. */
//Sample的最小延时时间
uint32_t min; /* Min of current samples. */
//Sample的最大延时时间
uint32_t max; /* Max of current samples. */
//平均相对误差,与平均延时相比
uint32_t mad; /* Mean absolute deviation. */
//samples的总数
uint32_t samples; /* Number of non-zero samples. */
//最早的延时记录点的创建时间
time_t period; /* Number of seconds since first event and now. */
};
~~~
意思都非常的直接,那么一个简单的Sample如何进行事件的检测呢?
~~~
/* Start monitoring an event. We just set the current time. */
/* 对某个事件设置监听,就是设置一下当前的时间 */
#define latencyStartMonitor(var) if (server.latency_monitor_threshold) { \
var = mstime(); \
} else { \
var = 0; \
}
/* End monitoring an event, compute the difference with the current time
* to check the amount of time elapsed. */
/* 结束监听,算出过了多少时间 */
#define latencyEndMonitor(var) if (server.latency_monitor_threshold) { \
var = mstime() - var; \
}
~~~
很简单,记录开始时间,记录结束时间,中间的差值就是延时时间了,如果超出给定的时间范围,就加入到延时列表中:
~~~
/* Add the sample only if the elapsed time is >= to the configured threshold. */
/* 如果延时时间超出server.latency_monitor_threshold,则将Sample加入延时列表中 */
#define latencyAddSampleIfNeeded(event,var) \
if (server.latency_monitor_threshold && \
(var) >= server.latency_monitor_threshold) \
latencyAddSample((event),(var));
~~~
我们重点关注一下,latencyAddSample,就是把采样结点加入到记录中,步骤如下:
1.根据传入的event事件,在server.latency_events找到key为event事件 的val,即一个latencyTimeSeries
2.在这个latencyTimeSeries的struct latencySample samples[LATENCY_TS_LEN]中添加一个新的Sample
实现代码如下:
~~~
/* Add the specified sample to the specified time series "event".
* This function is usually called via latencyAddSampleIfNeeded(), that
* is a macro that only adds the sample if the latency is higher than
* server.latency_monitor_threshold. */
/* 添加Sample到指定的Event对象的Sample列表中 */
void latencyAddSample(char *event, mstime_t latency) {
//找出Event对应的延时Sample记录结构体
struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);
time_t now = time(NULL);
int prev;
/* Create the time series if it does not exist. */
if (ts == NULL) {
ts = zmalloc(sizeof(*ts));
ts->idx = 0;
ts->max = 0;
memset(ts->samples,0,sizeof(ts->samples));
//如果ts为空,重新添加,一个Event,对应一个latencyTimeSeries
dictAdd(server.latency_events,zstrdup(event),ts);
}
/* If the previous sample is in the same second, we update our old sample
* if this latency is > of the old one, or just return. */
prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;
if (ts->samples[prev].time == now) {
if (latency > ts->samples[prev].latency)
ts->samples[prev].latency = latency;
return;
}
//为Sample赋值
ts->samples[ts->idx].time = time(NULL);
ts->samples[ts->idx].latency = latency;
if (latency > ts->max) ts->max = latency;
ts->idx++;
if (ts->idx == LATENCY_TS_LEN) ts->idx = 0;
}
~~~
结点都出来之后,当然会进行结构的分析统计了,这时就用到了latencyStats结构体;
~~~
/* Analyze the samples avaialble for a given event and return a structure
* populate with different metrics, average, MAD, min, max, and so forth.
* Check latency.h definition of struct latenctStat for more info.
* If the specified event has no elements the structure is populate with
* zero values. */
/* 分析某个时间Event的延时结果,结果信息存入latencyStats结构体中 */
void analyzeLatencyForEvent(char *event, struct latencyStats *ls) {
struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);
int j;
uint64_t sum;
//初始化延时统计结果结构体的变量
ls->all_time_high = ts ? ts->max : 0;
ls->avg = 0;
ls->min = 0;
ls->max = 0;
ls->mad = 0;
ls->samples = 0;
ls->period = 0;
if (!ts) return;
/* First pass, populate everything but the MAD. */
sum = 0;
for (j = 0; j < LATENCY_TS_LEN; j++) {
if (ts->samples[j].time == 0) continue;
ls->samples++;
if (ls->samples == 1) {
ls->min = ls->max = ts->samples[j].latency;
} else {
//找出延时最大和最小的延时时间
if (ls->min > ts->samples[j].latency)
ls->min = ts->samples[j].latency;
if (ls->max < ts->samples[j].latency)
ls->max = ts->samples[j].latency;
}
sum += ts->samples[j].latency;
/* Track the oldest event time in ls->period. */
if (ls->period == 0 || ts->samples[j].time < ls->period)
//最早的延时记录点的创建时间
ls->period = ts->samples[j].time;
}
/* So far avg is actually the sum of the latencies, and period is
* the oldest event time. We need to make the first an average and
* the second a range of seconds. */
if (ls->samples) {
ls->avg = sum / ls->samples;
ls->period = time(NULL) - ls->period;
if (ls->period == 0) ls->period = 1;
}
/* Second pass, compute MAD. */
//计算平均相对误差,与平均延时相比
sum = 0;
for (j = 0; j < LATENCY_TS_LEN; j++) {
int64_t delta;
if (ts->samples[j].time == 0) continue;
delta = (int64_t)ls->avg - ts->samples[j].latency;
if (delta < 0) delta = -delta;
sum += delta;
}
if (ls->samples) ls->mad = sum / ls->samples;
}
~~~
当然还可以利用这些采集的点,画一个微线图,更加形象的展示出来:
~~~
#define LATENCY_GRAPH_COLS 80
/* 利用延时的Sample点,画出对应的微线图 */
sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
int j;
struct sequence *seq = createSparklineSequence();
sds graph = sdsempty();
uint32_t min = 0, max = 0;
for (j = 0; j < LATENCY_TS_LEN; j++) {
int i = (ts->idx + j) % LATENCY_TS_LEN;
int elapsed;
char *label;
char buf[64];
if (ts->samples[i].time == 0) continue;
/* Update min and max. */
if (seq->length == 0) {
min = max = ts->samples[i].latency;
} else {
if (ts->samples[i].latency > max) max = ts->samples[i].latency;
if (ts->samples[i].latency < min) min = ts->samples[i].latency;
}
/* Use as label the number of seconds / minutes / hours / days
* ago the event happened. */
elapsed = time(NULL) - ts->samples[i].time;
if (elapsed < 60)
snprintf(buf,sizeof(buf),"%ds",elapsed);
else if (elapsed < 3600)
snprintf(buf,sizeof(buf),"%dm",elapsed/60);
else if (elapsed < 3600*24)
snprintf(buf,sizeof(buf),"%dh",elapsed/3600);
else
snprintf(buf,sizeof(buf),"%dd",elapsed/(3600*24));
label = zstrdup(buf);
sparklineSequenceAddSample(seq,ts->samples[i].latency,label);
}
graph = sdscatprintf(graph,
"%s - high %lu ms, low %lu ms (all time high %lu ms)\n", event,
(unsigned long) max, (unsigned long) min, (unsigned long) ts->max);
for (j = 0; j < LATENCY_GRAPH_COLS; j++)
graph = sdscatlen(graph,"-",1);
graph = sdscatlen(graph,"\n",1);
//调用sparkline函数画微线图
graph = sparklineRender(graph,seq,LATENCY_GRAPH_COLS,4,SPARKLINE_FILL);
freeSparklineSequence(seq);
//返回微线图字符串
return graph;
}
~~~
在Redis还封装了一些命令供外部调用,这里就不分析了,就是对上述方法的复合调用:
~~~
/* ---------------------------- Latency API --------------------------------- */
void latencyMonitorInit(void) /* 延时监听初始化操作,创建Event字典对象 */
void latencyAddSample(char *event, mstime_t latency) /* 添加Sample到指定的Event对象的Sample列表中 */
int latencyResetEvent(char *event_to_reset) /* 重置Event事件的延迟,删除字典中的event的记录 */
void analyzeLatencyForEvent(char *event, struct latencyStats *ls) /* 分析某个时间Event的延时结果,结果信息存入latencyStats结构体中 */
sds createLatencyReport(void) /* 根据延时Sample的结果,创建阅读性比较好的分析报告 */
void latencyCommandReplyWithSamples(redisClient *c, struct latencyTimeSeries *ts)
void latencyCommandReplyWithLatestEvents(redisClient *c)
sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts)
void latencyCommand(redisClient *c)
~~~
Redis的延时类文件的分析也结束了,分析了这么长时间Redis的Redis代码,感觉每一块的代码都会有他的亮点存在,分析了30多期下来,还是学到了很多网上所学不到的知识,网上更多的是Redis主流思想的学习,像一些比较细小点,也只有自己品味,自己才能够真正的体会。
';
(三十)— pubsub发布订阅模式
最后更新于:2022-04-01 20:21:19
今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;
~~~
/*-----------------------------------------------------------------------------
* Pubsub low level API
*----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) /* 释放发布订阅的模式 */
int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */
int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */
int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message */
/* ------------PUB/SUB API ---------------- */
void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
void publishCommand(redisClient *c) /* 发布消息命令 */
void pubsubCommand(redisClient *c) /* 发布订阅命令 */
~~~
在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:
**1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道**
**2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients**
**3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。**
我们可以简单的看一下Redis订阅一个Channel的方法实现;
~~~
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
/* Client订阅一个Channel管道 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
//在Client的字典pubsub_channels中添加Channel
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
//添加Clietn到server中的pubsub_channels,对应的列表中
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
//如果此频道的Client列表为空,则创建新列表并添加
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
//否则,获取这个频道的客户端列表,在尾部添加新的客户端
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
//添加给回复客户端
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
~~~
添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作:
~~~
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
/* 取消订阅Client中的Channel */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
//字典删除Client中pubsub_channels中的Channel
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
//再移除Channel对应的Client列表
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
~~~
里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:
~~~
/* Publish a message */
/* 为所有订阅了Channel的Client发送消息message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
//找到Channel所对应的dictEntry
de = dictFind(server.pubsub_channels,channel);
if (de) {
//获取此Channel对应的客户单列表
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
//依次取出List中的客户单,添加消息回复
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
//添加消息回复
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
/* 发送给尝试匹配该Channel的客户端消息 */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//客户端的模式如果匹配了Channel,也会发送消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
~~~
pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,
~~~
/* ----------------- API ------------------- */
int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的Class类型 */
sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */
~~~
涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;
~~~
/* Turn a string representing notification classes into an integer
* representing notification classes flags xored.
*
* The function returns -1 if the input contains characters not mapping to
* any class. */
/* 键值字符类型转为对应的Class类型 */
int keyspaceEventsStringToFlags(char *classes) {
char *p = classes;
int c, flags = 0;
while((c = *p++) != '\0') {
switch(c) {
case 'A': flags |= REDIS_NOTIFY_ALL; break;
case 'g': flags |= REDIS_NOTIFY_GENERIC; break;
case '$': flags |= REDIS_NOTIFY_STRING; break;
case 'l': flags |= REDIS_NOTIFY_LIST; break;
case 's': flags |= REDIS_NOTIFY_SET; break;
case 'h': flags |= REDIS_NOTIFY_HASH; break;
case 'z': flags |= REDIS_NOTIFY_ZSET; break;
case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;
case 'e': flags |= REDIS_NOTIFY_EVICTED; break;
case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;
case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;
default: return -1;
}
}
return flags;
}
~~~
应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:
~~~
/* The API provided to the rest of the Redis core is a simple function:
*
* notifyKeyspaceEvent(char *event, robj *key, int dbid);
*
* 'event' is a C string representing the event name.
* 'key' is a Redis object representing the key name.
* 'dbid' is the database ID where the key lives. */
/* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
//2种的通知形式,略有差别
/* __keyspace@__: notifications. */
if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(REDIS_STRING, chan);
//上述几步操作,组件格式字符串,最后发布消息,下面keyEvent的通知同理
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* __keyevente@__: notifications. */
if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(REDIS_STRING, chan);
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}
~~~
有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。
';
(二十九)— bio后台I/O服务的实现
最后更新于:2022-04-01 20:21:17
在Redis系统中也存在后台服务的概念,background Service,后台线程在Redis中的表现主要为background I/O Service,有了后台线程的支持,系统在执行的效率上也势必会有不一样的提高。在Redis代码中,描述了此功能的文件为bio.c,同样借此机会学习一下,在C语言中的多线程编程到底是怎么一回事。我们先来看看,在Redis中的background job的工作形式;
~~~
/* Background I/O service for Redis.
*
* 后台I/O服务
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
*
* DESIGN
* ------
*
* The design is trivial, we have a structure representing a job to perform
* and a different thread and job queue for every job type.
* Every thread wait for new jobs in its queue, and process every job
* sequentially.
*
* Jobs of the same type are guaranteed to be processed from the least
* recently inserted to the most recently inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
*
* 作者定义了一个结构体代表一个工作,每个线程等待从相应的job Type工作队列中获取一个job,每个job的排列的都按照时间
* 有序排列的
* ----------------------------------------------------------------------------
~~~
这里总共与2种Background I/O Type:
~~~
/* Background job opcodes */
/* 定义了2种后台工作的类别 */
#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall.文件的关闭 */
#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync.AOF文件的同步 */
/* BIO后台操作类型总数为2个 */
#define REDIS_BIO_NUM_OPS 2
~~~
一个是AOF文件的同步操作,AOF就是“Append ONLY File”的缩写,记录每次的数据改变的写操作,用于数据的恢复。还有一个我好像没碰到过,CLOSE FILE,难道是异步关闭文件的意思。
~~~
static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; /* 定义了bio线程组变量 */
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; /* 线程相对应的mutex变量,用于同步操作 */
static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
static list *bio_jobs[REDIS_BIO_NUM_OPS]; /* 每种job类型都是一个列表 */
/* The following array is used to hold the number of pending jobs for every
* OP type. This allows us to export the bioPendingJobsOfType() API that is
* useful when the main thread wants to perform some operation that may involve
* objects shared with the background thread. The main thread will just wait
* that there are no longer jobs of this type to be executed before performing
* the sensible operation. This data is also useful for reporting. */
static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; /* 此类型job等待执行的数量 */
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
/* background Job结构体 */
struct bio_job {
//job创建的时间
time_t time; /* Time at which the job was created. */
/* Job specific arguments pointers. If we need to pass more than three
* arguments we can just pass a pointer to a structure or alike. */
/* job特定参数指针 */
void *arg1, *arg2, *arg3;
};
~~~
上面声明了一些变量,包括bio_threads线程数组,总数2个,bio_jobs列表数组,存放每种Type的job。下面我们看主要的一些方法:
~~~
/* Exported API */
void bioInit(void); /* background I/O初始化操作 */
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); /* 创建后台job,通过传入的3个参数初始化 */
unsigned long long bioPendingJobsOfType(int type); /* 返回type类型的job正在等待被执行的个数 */
void bioWaitPendingJobsLE(int type, unsigned long long num); /* 返回type类型的job正在等待被执行的个数 */
time_t bioOlderJobOfType(int type);
void bioKillThreads(void); /* 杀死后台所有线程 */
~~~
首先看初始化操作;
~~~
/* Initialize the background system, spawning the thread. */
/* background I/O初始化操作 */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
//创建每个job类型的List列表
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
//设置线程栈空间
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//创建2个线程,专门运行相应类型的job
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
//赋值到相应的Thread中
bio_threads[j] = thread;
}
}
~~~
也就是说,执行完上述的操作之后,在bio_threads线程中就运行着2个线程,从各自的job列表中取出相应的等待执行的jo;
~~~
/* 创建后台job,通过传入的3个参数初始化 */
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
//加入相对应的job type列表
listAddNodeTail(bio_jobs[type],job);
//等待的job数量增加1
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
~~~
简洁的创建background job操作,上面利用了mutex变量实现了线程同步操作,保证线程安全。下面看一下最重要的执行background Job的操作实现(省略了部分代码):
~~~
/* 执行后台的job,参数内包含着哪种type */
void *bioProcessBackgroundJobs(void *arg) {
......
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
//从工作列表中取出第一个job
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
//执行具体的工作
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
~~~
while循环,从队列中取出一个,执行一个操作。当然,如果想马上停止一切后台线程,可以执行下面的方法,调用
pthread_cancel:
~~~
/* Kill the running bio threads in an unclean way. This function should be
* used only when it's critical to stop the threads for some reason.
* Currently Redis does this only on crash (for instance on SIGSEGV) in order
* to perform a fast memory check without other threads messing with memory. */
/* 杀死后台所有线程 */
void bioKillThreads(void) {
int err, j;
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//调用pthread_cancel方法kill当前的后台线程
if (pthread_cancel(bio_threads[j]) == 0) {
if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
redisLog(REDIS_WARNING,
"Bio thread for job type #%d can be joined: %s",
j, strerror(err));
} else {
redisLog(REDIS_WARNING,
"Bio thread for job type #%d terminated",j);
}
}
}
}
~~~
';
(二十八)— object创建和释放redisObject对象
最后更新于:2022-04-01 20:21:14
今天的学习效率比较高,把Rio分析完了,又顺便学习了其中的RedisObject的文件,只要讲的就是RedisObject的一些转换和创建。里面的大多数方法都是非常类似的。列出里面长长的API列表:
~~~
/* ------------ API --------------------- */
robj *createObject(int type, void *ptr) /* 最初的创建robj对象方法,后面的创建方法与此类似 */
robj *createStringObject(char *ptr, size_t len)
robj *createStringObjectFromLongLong(long long value)
robj *createStringObjectFromLongDouble(long double value)
robj *dupStringObject(robj *o)
robj *createListObject(void)
robj *createZiplistObject(void)
robj *createSetObject(void)
robj *createIntsetObject(void)
robj *createHashObject(void)
robj *createZsetObject(void)
robj *createZsetZiplistObject(void)
void freeStringObject(robj *o) /* free Obj中的特定对象,这里free的是r->ptr */
void freeListObject(robj *o)
void freeSetObject(robj *o)
void freeZsetObject(robj *o)
void freeHashObject(robj *o) /* 释放hashObject有2种形式,1个是o-ptr的字典对象,还有1个回事压缩表o->ptr */
void incrRefCount(robj *o) /* robj对象增减引用计数,递增robj中的refcount的值 */
void decrRefCount(robj *o) /* 递减robj中的引用计数,引用到0后,释放对象 */
void decrRefCountVoid(void *o)
robj *resetRefCount(robj *obj)
int checkType(redisClient *c, robj *o, int type) /* 检查robj的类型是否为给定的Type类型 */
int isObjectRepresentableAsLongLong(robj *o, long long *llval)
robj *tryObjectEncoding(robj *o) /* 编码一个robj中的额字符对象,主要是为了省空间 */
robj *getDecodedObject(robj *o) /* 获取解码后的robj */
int compareStringObjectsWithFlags(robj *a, robj *b, int flags)
int compareStringObjects(robj *a, robj *b)
int collateStringObjects(robj *a, robj *b)
int equalStringObjects(robj *a, robj *b)
size_t stringObjectLen(robj *o)
int getDoubleFromObject(robj *o, double *target) /* 从robj中获取double数值 */
int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const char *msg)
int getLongDoubleFromObject(robj *o, long double *target)
int getLongDoubleFromObjectOrReply(redisClient *c, robj *o, long double *target, const char *msg)
int getLongLongFromObject(robj *o, long long *target)
int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg)
int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg)
char *strEncoding(int encoding)
unsigned long estimateObjectIdleTime(robj *o)
robj *objectCommandLookup(redisClient *c, robj *key) /* obj的查找命令, */
robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply)
void objectCommand(redisClient *c)
~~~
从前往后看,第一个创建obj:
~~~
/* 最初的创建robj对象方法 */
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution). */
o->lru = server.lruclock;
return o;
}
~~~
有创建就必然会有释放的free方法:
~~~
/* free Obj中的特定对象 */
void freeStringObject(robj *o) {
if (o->encoding == REDIS_ENCODING_RAW) {
sdsfree(o->ptr);
}
}
~~~
free方法有很多衍生的方法,看你要释放哪种类型的空间了,可以,set,dict,ziplist等等。有下面的一些类型:
~~~
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
~~~
重点介绍里面的关于引用计数的相关方法,通过robj->refcount的数值进行控制的:
~~~
/* robj对象增减引用计数,递增robj中的refcount的值 */
void incrRefCount(robj *o) {
//递增robj中的refcount的值
o->refcount++;
}
~~~
增加引用计数就一行代码,但是递减的话,我们猜也可以猜到,引用计数变为0的时候,说明无人使用了,就可以释放空间了;
~~~
/* 递减robj中的引用计数,引用到0后,释放对象 */
void decrRefCount(robj *o) {
//如果之前的引用计数已经<=0了,说明出现异常情况了
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
if (o->refcount == 1) {
//如果之前的引用计数为1,再递减一次,恰好内有被任何对象引用了,所以就可以释放对象了
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
zfree(o);
} else {
//其他对于>1的引用计数的情况,只需要按常规的递减引用计数即可
o->refcount--;
}
}
~~~
标准的引用计数法控制内存的管理,(提醒一下,在JVM中的对象的生命周期管理用的是根搜索法,不是引用计数),还有一个在robj中的编码方法的实现也是定义在这个文件中:
~~~
/* Try to encode a string object in order to save space */
/* 编码一个robj中的额字符对象,主要是为了省空间 */
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;
if (o->encoding != REDIS_ENCODING_RAW)
//如果robj已经被编码了,则直接返回
return o; /* Already encoded */
/* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis. Encoded objects can only
* appear as "values" (and not, for instance, as keys) */
/* 如果robj的引用计数超过1个人引用的时候,是不安全的去编码obj,因为对象是共享的 */
if (o->refcount > 1) return o;
/* Currently we try to encode only strings */
redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
/* Check if we can represent this string as a long integer */
len = sdslen(s);
if (len > 21 || !string2l(s,len,&value)) {
/* We can't encode the object...
*
* Do the last try, and at least optimize the SDS string inside
* the string object to require little space, in case there
* is more than 10% of free space at the end of the SDS string.
*
* We do that for larger strings, using the arbitrary value
* of 32 bytes. This code was backported from the unstable branch
* where this is performed when the object is too large to be
* encoded as EMBSTR. */
if (len > 32 &&
o->encoding == REDIS_ENCODING_RAW &&
sdsavail(s) > len/10)
{
//调用sdsRemoveFreeSpace把0->ptr中的字符串中的空格给移除掉
o->ptr = sdsRemoveFreeSpace(o->ptr);
}
/* Return the original object. */
return o;
}
.....
~~~
就是移除字符串中的空格所占的空间。想对应也存在一个getDecodeObject(),:
~~~
/* Get a decoded version of an encoded object (returned as a new object).
* If the object is already raw-encoded just increment the ref count. */
/* 获取解码后的robj */
robj *getDecodedObject(robj *o) {
robj *dec;
if (o->encoding == REDIS_ENCODING_RAW) {
//如果没有编码方式,则直接增加引用计数,并返回
incrRefCount(o);
return o;
}
if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
char buf[32];
//如果是string,Type而且是encoding_int方式的,先做一步转换
ll2string(buf,32,(long)o->ptr);
dec = createStringObject(buf,strlen(buf));
return dec;
} else {
redisPanic("Unknown encoding type");
}
}
~~~
以上就是对于对于RedisObject的简单分析。
';
(二十七)— rio系统I/O的封装
最后更新于:2022-04-01 20:21:12
I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:
~~~
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
/* 数据流的读方法 */
size_t (*read)(struct _rio *, void *buf, size_t len);
/* 数据流的写方法 */
size_t (*write)(struct _rio *, const void *buf, size_t len);
/* 获取当前的读写偏移量 */
off_t (*tell)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
/* 当读入新的数据块的时候,会更新当前的校验和 */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
/* 当前的校验和 */
uint64_t cksum;
/* number of bytes read or written */
/* 当前读取的或写入的字节大小 */
size_t processed_bytes;
/* maximum single read or write chunk size */
/* 最大的单次读写的大小 */
size_t max_processing_chunk;
/* Backend-specific vars. */
/* rio中I/O变量 */
union {
//buffer结构体
struct {
//buffer具体内容
sds ptr;
//偏移量
off_t pos;
} buffer;
//文件结构体
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
//同步的最小大小
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};
~~~
里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法:
~~~
/* The following functions are our interface with the stream. They'll call the
* actual implementation of read / write / tell, and will update the checksum
* if needed. */
/* rio的写方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
//判断当前操作字节长度是否超过最大长度
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//写入新的数据时,更新校验和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
//执行写方法
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
//操作字节数增加
r->processed_bytes += bytes_to_write;
}
return 1;
}
/* rio的读方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
while (len) {
//判断当前操作字节长度是否超过最大长度
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//读数据方法
if (r->read(r,buf,bytes_to_read) == 0)
return 0;
//读数据时,更新校验和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
}
return 1;
}
~~~
这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:
~~~
/* 根据上面描述的方法,定义了BufferRio */
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
/* 根据上面描述的方法,定义了FileRio */
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
~~~
里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法:
~~~
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的buffer内容到传入的参数 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
r->io.buffer.pos += len;
return 1;
}
~~~
~~~
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的fp文件内容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp);
}
~~~
作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:
~~~
/* rio写入不同类型数据方法,最终调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
~~~
举其中的一个方法实现:
~~~
/* Write multi bulk count in the format: "*\r\n". */
/* rio写入不同类型数据方法,调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
char cbuf[128];
int clen;
cbuf[0] = prefix;
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (rioWrite(r,cbuf,clen) == 0) return 0;
return clen;
}
~~~
调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。
~~~
/* Returns 1 or 0 for success/failure. */
/* 将buf写入rio中的file文件中 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
//判读是否需要同步
fflush(r->io.file.fp);
aof_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
~~~
';
(二十六)— slowLog和hyperloglog
最后更新于:2022-04-01 20:21:10
今天学习的是是2个log的文件,2个文件的实现功能都超出我原本理解的意思。开始时我以为就是记录不同的类型的日志,后来才慢慢的明白了额,slowLog记录的是超时的查询记录,而hyperloglog其实跟日志一点关系都没有,好吧,我再一次傻眼了,他其实是一种基数统计算法,应该分开了看,hyper + loglog的计算。好,接下来,我们开始学习一下Redis代码中是如何实现的。
slowLog的官方解释:
~~~
/* Slowlog implements a system that is able to remember the latest N
* queries that took more than M microseconds to execute.
*
* The execution time to reach to be logged in the slow log is set
* using the 'slowlog-log-slower-than' config directive, that is also
* readable and writable using the CONFIG SET/GET command.
*
* The slow queries log is actually not "logged" in the Redis log file
* but is accessible thanks to the SLOWLOG command.
*
*大致意思就是SlowLog记录的是系统最近N个超过一定时间的查询,就是比较耗时的查询
* ----------------------------------------------------------------------------
~~~
里面定义了一个slowLog entry的结构体:
~~~
/* This structure defines an entry inside the slow log list */
/* 慢日志结构体,将会插入到slowLogList,慢日志列表中 */
typedef struct slowlogEntry {
robj **argv;
int argc;
//自身的id标识
long long id; /* Unique entry identifier. */
//query操作所消耗的时间,单位为nanoseconds
long long duration; /* Time spent by the query, in nanoseconds. */
//查询发发生的时间
time_t time; /* Unix time at which the query was executed. */
} slowlogEntry;
/* Exported API */
void slowlogInit(void); /* slowlog初始化操作 */
void slowlogPushEntryIfNeeded(robj **argv, int argc, long long duration); /* slowLogEntry压入列表操作 */
/* Exported commands */
/* 开放给系统使用的命令 */
void slowlogCommand(redisClient *c);
~~~
里面定义的方法也非常简单。初始化init方法和插入方法,在服务端的server端,维护了一个slowLog的列表,会按照时间顺序插入超时的查询记录,也就是slowLogEntry记录:
~~~
/* Initialize the slow log. This function should be called a single time
* at server startup. */
/* slowLog的初始化操作 */
void slowlogInit(void) {
//创建slowLog的List
server.slowlog = listCreate();
//第一个entry_id声明为0
server.slowlog_entry_id = 0;
listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}
~~~
插入列表的方法:
~~~
/* Push a new entry into the slow log.
* This function will make sure to trim the slow log accordingly to the
* configured max length. */
/* 插入一个entry到slowLog列表中,如果时间超出给定的时间范围时 */
void slowlogPushEntryIfNeeded(robj **argv, int argc, long long duration) {
if (server.slowlog_log_slower_than < 0) return; /* Slowlog disabled */
if (duration >= server.slowlog_log_slower_than)
//如果entry的duration时间超出slowlog_log_slower_than时间,则添加
listAddNodeHead(server.slowlog,slowlogCreateEntry(argv,argc,duration));
/* Remove old entries if needed. */
while (listLength(server.slowlog) > server.slowlog_max_len)
//如果列表长度已经超出slowLog的最大值,移除最后一个slowLogEntry
listDelNode(server.slowlog,listLast(server.slowlog));
}
~~~
slowLog就是这样,非常简单明了,重点学习一下hyperloglog,作为一种基数统计算法,比如统计一篇莎士比亚的文章中,不同单词出现的个数,如果按照平常我们想到的做法,把里面的单词都存到hashset中,求出容量即可,但是当面对的是海量数据的时候,这得占据多大的内存呢,所以就有了后来我们说的“位图法“,位图可以快速、准确地获取一个给定输入的基数。位图的基本思想是使用哈希函数把数据集映射到一个bit位,每个输入元素与bit位是一一对应。这样Hash将没有产生碰撞冲突,并减少需要计算每个元素映射到1个bit的空间。虽然Bit-map大大节省了存储空间,但当统计很高的基数或非常大的不同的数据集,它们仍然有问题。但是比较幸运的是,基数统计作为一个新兴的领域,也已经有了许多开源算法的实现,基数统计算法的思想是用准确率换取空间,准确率可以稍稍差一点点,但是可以大大的缩减占用的空间。下面在网上找了3个比较典型的基数统计算法,这三种技术是:Java HashSet、Linear Probabilistic Counter以及一个Hyper LogLog Counter,我说其中的第二种和第三种。
Linear Probabilistic Counter线性概率计数器是高效的使用空间,并且允许实现者指定所需的精度水平。该算法在注重空间效率时是很有用的,但你需要能够控制结果的误差。该算法分两步运行:第一步,首先在内存中分配一个初始化为都为0的Bit-map,然后使用哈希函数对输入数据中的每个条目进行hash计算,哈希函数运算的结果是将每条记录(或者是元素)映射到Bit-map的一个Bit位上,该Bit位被置为1;第二步,算法计算空的bit位数量,并使用这个数输入到下面的公式来进行估算:
n=-m ln Vn
注意:ln Vn=Loge(Vn) 自然对数
在公式中,m是 Bit-map的大小,Vn是空bit位和map的大小的比率。需要重点注意的是原始Bit-map的大小,可以远小于预期的最大基数。到底小多少取决于你可以承受误差的大小。因为Bit-map的大小m小于不同元素的总数将会产生碰撞。虽然碰撞可以节省空间,但同时也造成了估算结果出现误差。所以通过控制原始map的大小,我们可以估算碰撞的次数,以致我们将在最终结果中看到误差有多大。
hyperLogLog提供了比上面效率更高的算法。顾名思义,Hyper LogLog计数器就是估算Nmax为基数的数据集仅需使用loglog(Nmax)+O(1) bits就可以。如线性计数器的Hyper LogLog计数器允许设计人员指定所需的精度值,在Hyper LogLog的情况下,这是通过定义所需的相对标准差和预期要计数的最大基数。大部分计数器通过一个输入数据流M,并应用一个哈希函数设置h(M)来工作。这将产生一个S = h(M) of {0,1}^∞字符串的可观测结果。通过分割哈希输入流成m个子字符串,并对每个子输入流保持m的值可观测 ,这就是相当一个新Hyper LogLog(一个子m就是一个新的Hyper LogLog)。利用额外的观测值的平均值,产生一个计数器,其精度随着m的增长而提高,这只需要对输入集合中的每个元素执行几步操作就可以完成。其结果是,这个计数器可以仅使用1.5 kb的空间计算精度为2%的十亿个不同的数据元素。与执行 HashSet所需的120 兆字节进行比较,这种算法的效率很明显。这就是传说中的”如何仅用1.5KB内存为十亿对象计数“。
';
(二十五)— zmalloc内存分配实现
最后更新于:2022-04-01 20:21:08
时间过的很快,经过快1个月的时间学习,本人对Redis源代码的分析已经超过了一半,上几次的学习,我主要的是对于Redis工具类的代码进行了学习。后面的几天我将会学习Redis代码中的一些封装类的实现,这些封装类在整个Redis系统中都可能普遍用到。比如说我马上要分析的在zmalloc的内存封装的实现。先抛开Redis的内存函数库不说,在纯粹的C语言中,内存分配的函数有malloc,free,relloc这3个函数,熟悉C语言编程的同学一定不会陌生。但是在这里Redis代码的编写者,在Redis系统中对内存的分配又做了一次小小封装。我也只能说是一个小小的封装,核心的调用方法仍是C语言中的这3个函数。先看一下在zmalloc.h头文件中define的一些API:
~~~
void *zmalloc(size_t size); /* 调用zmalloc申请size个大小的空间 */
void *zcalloc(size_t size); /* 调用系统函数calloc函数申请空间 */
void *zrealloc(void *ptr, size_t size); /* 原内存重新调整空间为size的大小 */
void zfree(void *ptr); /* 释放空间方法,并更新used_memory的值 */
char *zstrdup(const char *s); /* 字符串复制方法 */
size_t zmalloc_used_memory(void); /* 获取当前已经占用的内存大小 */
void zmalloc_enable_thread_safeness(void); /* 是否设置线程安全模式 */
void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); /* 可自定义设置内存溢出的处理方法 */
float zmalloc_get_fragmentation_ratio(size_t rss); /* 所给大小与已使用内存大小之比 */
size_t zmalloc_get_rss(void);
size_t zmalloc_get_private_dirty(void); /* 获取私有的脏数据大小 */
void zlibc_free(void *ptr); /* 原始系统free释放方法 */
~~~
在这里还要介绍几个概念和变量:
~~~
static size_t used_memory = 0;
static int zmalloc_thread_safe = 0;
pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
~~~
第一个used_memory,看意思我们也知道这是系统已经使用了多少的内存大小,在全局只维护了这么一个变量的大小,说明作者希望根据此来分析出当前的内存的使用情况,第二个zmalloc_thread_safe,这指的是线程安全模式状态,下面的mutext,就是为此服务端,这个在操作系统中就出现过。据此,我们大概知道了,Redis在代码的malloc等操作的时候,会根据创建的大小,会更新used_memory,并操作的模式会有线程安全和不安去模式的区分。
~~~
/* 在对内存空间做使用的时候,进行了加锁控制 */
#define update_zmalloc_stat_add(__n) do { \
pthread_mutex_lock(&used_memory_mutex); \
used_memory += (__n); \
pthread_mutex_unlock(&used_memory_mutex); \
} while(0)
~~~
上面的函数操作就是线程安全模式时候的一个操作,通过锁操作实现对于used_memory的控制,是对这个变量做控制,避免了这个数值出现脏数据的可能。
~~~
/* 调用zmalloc申请size个大小的空间 */
void *zmalloc(size_t size) {
//实际调用的还是malloc函数
void *ptr = malloc(size+PREFIX_SIZE);
//如果申请的结果为null,说明发生了oom,调用oom的处理方法
if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
//更新used_memory的大小
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}
~~~
zmalloc的具体实现,调用的还是malloc的C语言方法,做了OOM的异常处理,然后更新大小。在update_zmalloc_stat_alloc方法里面:
~~~
/* 申请新的_n大小的内存,分为线程安全,和线程不安全的模式 */
#define update_zmalloc_stat_alloc(__n) do { \
size_t _n = (__n); \
if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); \
if (zmalloc_thread_safe) { \
update_zmalloc_stat_add(_n); \
} else { \
used_memory += _n; \
} \
} while(0)
~~~
同理zfree()的操作就是上面的反操作,调用free方法,把used_memory的值,做减少操作。在APIL里面还出现了zcalloc方法,下面函数代码中我分析一下这个函数和malloc到底有什么不同呢:
~~~
/* 调用系统函数calloc函数申请空间 */
void *zcalloc(size_t size) {
//calloc与malloc的意思一样,不过参数不一样
//void *calloc(size_t numElements,size_t sizeOfElement),;numElements * sizeOfElement才是最终的内存的大小
//所在这里就是申请一块大小为size+PREFIX_SIZE的空间
void *ptr = calloc(1, size+PREFIX_SIZE);
if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}
~~~
在这些方法中,作者很人性化的开放了一些API给用户调用,比如说为了效率的提高,可以不开启安全模式啊;
~~~
/* 是否设置线程安全模式 */
void zmalloc_enable_thread_safeness(void) {
zmalloc_thread_safe = 1;
}
~~~
或者自定义一个更合理的内存溢出的处理函数,更满足系统的需要:
~~~
/* 可自定义设置内存溢出的处理方法 */
void zmalloc_set_oom_handler(void (*oom_handler)(size_t)) {
zmalloc_oom_handler = oom_handler;
}
~~~
';
(二十四)— tool工具类(2)
最后更新于:2022-04-01 20:21:05
在上篇文章中初步的分析了一下,Redis工具类文件中的一些用法,包括2个随机算法和循环冗余校验算法,今天,继续学习Redis中的其他的一些辅助工具类的用法。包括里面的大小端转换算法,sha算法在Redis中的实现和通用工具类算法util.c。
先来看看大小端转换算法,大小端学习过操作系统的人一定知道是什么意思,在不同的操作系统中,高位数字的存储方式存在,高位在前,低位在后,或是高位在后,低位在前,所以这里面就涉及到转换,根据不同的操作系统,有不同的转换方式,所以Redis在这方面就开放了这样一批的API;
~~~
/* 对于16位,32位,64位作大小端的转换 */
void memrev16(void *p);
void memrev32(void *p);
void memrev64(void *p);
uint16_t intrev16(uint16_t v);
uint32_t intrev32(uint32_t v);
uint64_t intrev64(uint64_t v);
~~~
挑出其中的一个API的实现:
~~~
/* Toggle the 32 bit unsigned integer pointed by *p from little endian to
* big endian */
/* 32位需要4个字节,第0和第3个,第1和第2个字节作交换 */
void memrev32(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[3];
x[3] = t;
t = x[1];
x[1] = x[2];
x[2] = t;
}
~~~
总之就是做头尾部的交换。
下面在Redis中的加密算法的实现,采用的是SHA算法,/SHA:Secure Hash Algorithm安全散列算法,与MD5算法类似,也是属于单向加密算法,在加密长度上,做了很大的扩展,安全性也更高长度不超过2^64位的字符串或二进制流,经过SHA-1编码后,生成一个160位的二进制串 。在Redis中的C语言调用:
~~~
int
main(int argc, char **argv)
{
SHA1_CTX ctx;
unsigned char hash[20], buf[BUFSIZE];
int i;
for(i=0;i= 3) {
int start = pattern[0];
int end = pattern[2];
int c = string[0];
if (start > end) {
int t = start;
start = end;
end = t;
}
if (nocase) {
start = tolower(start);
end = tolower(end);
c = tolower(c);
}
pattern += 2;
patternLen -= 2;
if (c >= start && c <= end)
match = 1;
} else {
if (!nocase) {
if (pattern[0] == string[0])
match = 1;
} else {
if (tolower((int)pattern[0]) == tolower((int)string[0]))
match = 1;
}
}
pattern++;
patternLen--;
}
if (not)
match = !match;
if (!match)
return 0; /* no match */
string++;
stringLen--;
break;
}
case '\\':
if (patternLen >= 2) {
pattern++;
patternLen--;
}
/* fall through */
default:
/* 如果没有正则表达式的关键字符,则直接比较 */
if (!nocase) {
if (pattern[0] != string[0])
//不相等,直接不匹配
return 0; /* no match */
} else {
if (tolower((int)pattern[0]) != tolower((int)string[0]))
return 0; /* no match */
}
string++;
stringLen--;
break;
}
pattern++;
patternLen--;
if (stringLen == 0) {
while(*pattern == '*') {
pattern++;
patternLen--;
}
break;
}
}
if (patternLen == 0 && stringLen == 0)
//如果匹配字符和模式字符匹配的长度都减少到0了,说明匹配成功了
return 1;
return 0;
}
~~~
非常神奇的代码吧,从来没有想过去实现正则表达式原理的代码。还有一个方法是ll2string方法,数字转字符的方法,如果是我们平常的做法,就是除10取余,加上对应的数字字符,但是要转换的可是ll类型啊,长度非常长,效率会导致比较低,所以在Redis中作者,直接按除100算,2位,2位的赋值,而且用数字字符数字,做处理,直接按下标来赋值,避免了对余数的多次判断:
~~~
/* Convert a long long into a string. Returns the number of
* characters needed to represent the number.
* If the buffer is not big enough to store the string, 0 is returned.
*
* Based on the following article (that apparently does not provide a
* novel approach but only publicizes an already used technique):
*
* https://www.facebook.com/notes/facebook-engineering/three-optimization-tips-for-c/10151361643253920
*
* Modified in order to handle signed integers since the original code was
* designed for unsigned integers. */
/* long long类型转化为string类型 */
int ll2string(char* dst, size_t dstlen, long long svalue) {
static const char digits[201] =
"0001020304050607080910111213141516171819"
"2021222324252627282930313233343536373839"
"4041424344454647484950515253545556575859"
"6061626364656667686970717273747576777879"
"8081828384858687888990919293949596979899";
int negative;
unsigned long long value;
/* The main loop works with 64bit unsigned integers for simplicity, so
* we convert the number here and remember if it is negative. */
/* 在这里做正负号的判断处理 */
if (svalue < 0) {
if (svalue != LLONG_MIN) {
value = -svalue;
} else {
value = ((unsigned long long) LLONG_MAX)+1;
}
negative = 1;
} else {
value = svalue;
negative = 0;
}
/* Check length. */
uint32_t const length = digits10(value)+negative;
if (length >= dstlen) return 0;
/* Null term. */
uint32_t next = length;
dst[next] = '\0';
next--;
while (value >= 100) {
//做值的换算
int const i = (value % 100) * 2;
value /= 100;
//i所代表的余数值用digits字符数组中的对应数字代替了
dst[next] = digits[i + 1];
dst[next - 1] = digits[i];
next -= 2;
}
/* Handle last 1-2 digits. */
if (value < 10) {
dst[next] = '0' + (uint32_t) value;
} else {
int i = (uint32_t) value * 2;
dst[next] = digits[i + 1];
dst[next - 1] = digits[i];
}
/* Add sign. */
if (negative) dst[0] = '-';
return length;
}
~~~
digit[201]就是从00-99的数字字符,余数的赋值就通过这个数组,高效,方便,是提高了很多的速度。又发现了Redis代码中的一些亮点。
';
(二十三)— CRC循环冗余算法和RAND随机数算法
最后更新于:2022-04-01 20:21:03
今天开始研究Redis源码中的一些工具类的代码实现,工具类在任何语言中,实现的算法原理应该都是一样的,所以可以借此机会学习一下一些比较经典的算法。比如说我今天看的Crc循环冗余校验算法和rand随机数产生算法。
CRC算法全称循环冗余校验算法。CRC校验的基本思想是利用线性编码理论,在发送端根据要传送的k位二进制码序列,以一定的规则产生一个校验用的监督码(既CRC码)r位,并附在信息后边,构成一个新的二进制码序列数共(k+r)位,最后发送出去。在接收端, 则根据信息码和CRC码之间所遵循的规则进行检验,以确定传送中是否出错。16位的CRC码产生的规则是先将要发送的二进制序列数左移16位(既乘以 )后,再除以一个多项式,最后 所得到的余数既是CRC码。在Redis中实现的冗余校验算法为字节型算法;
字节型算法的一般描述为:本字节的CRC码,等于上一字节CRC码的低8位左移8位,与上一字节CRC右移8位同本字节异或后所得的CRC码异或。
字节型算法如下:
1)CRC寄存器组初始化为全"0"(0x0000)。(注意:CRC寄存器组初始化全为1时,最后CRC应取反。)
2)CRC寄存器组向左移8位,并保存到CRC寄存器组。
3)原CRC寄存器组高8位(右移8位)与数据字节进行异或运算,得出一个指向值表的索引。
4)索引所指的表值与CRC寄存器组做异或运算。
5)数据指针加1,如果数据没有全部处理完,则重复步骤2)。
6)得出CRC。
我们来对应一下在Redis中的代码,完全符合;
~~~
/* Crc64循环冗余运算算法,crc:基础值0,s:传入的内容,l:内容长度 */
uint64_t crc64(uint64_t crc, const unsigned char *s, uint64_t l) {
uint64_t j;
for (j = 0; j < l; j++) {
uint8_t byte = s[j];
crc = crc64_tab[(uint8_t)crc ^ byte] ^ (crc >> 8);
}
return crc;
}
~~~
Redis内置的例子,
~~~
/* Test main */
/* 测试的代码 */
#ifdef TEST_MAIN
#include
int main(void) {
printf("e9c6d914c4b8d9ca == %016llx\n",
(unsigned long long) crc64(0,(unsigned char*)"123456789",9));
return 0;
}
~~~
对字符串1到9做冗余运算。
下面说说Redis中的随机算法实现的原理,一开始以为是调用的是math.Rand()方法,后来发现,我真的是错了。作者给出的理由是:
~~~
/* Pseudo random number generation functions derived from the drand48()
* function obtained from pysam source code.
*
* This functions are used in order to replace the default math.random()
* Lua implementation with something having exactly the same behavior
* across different systems (by default Lua uses libc's rand() that is not
* required to implement a specific PRNG generating the same sequence
* in different systems if seeded with the same integer).
*
* The original code appears to be under the public domain.
* I modified it removing the non needed functions and all the
* 1960-style C coding stuff...
*
* 随机函数在不同的系统可能会表现出不同的行为,作者就没有采用系统自带的math.random,
* ,而是基于drand48()随机算法,重写了随机函数行为,作者在重写随机代码的时候取出了不需要的方法
* ----------------------------------------------------------------------------
~~~
也就是说作者是重写了随机算法。基于的算法实现是drand48()算法。因为此算法用到了48位的数字所以用此名。srand48和drand48是Unix库函数,drand48的作用是产生[0,1]之间均匀分布的随机数,采用了线性同余法和48位整数运算来产生伪随机序列函数用上面的算法产生一个48位的伪随机整数,然后再取出此整数的高32位作为随机数,然后将这个32位的伪随机数规划到[0,1]之间,用函数srand48来初始化drand48(),其只对于48位整数的高32位进行初始化,而其低16位被设定为随机值。这是一种统计特性比较好的伪随机发生器。这2个函数原版的C语言实现:
~~~
#ifndef DRAND48_H
#define DRAND48_H
#include
#define m 0x100000000LL
#define c 0xB16
#define a 0x5DEECE66DLL
static unsigned long long seed = 1;
double drand48(void)
{
seed = (a * seed + c) & 0xFFFFFFFFFFFFLL;
unsigned int x = seed >> 16;
return ((double)x / (double)m);
}
void srand48(unsigned int i)
{
seed = (((long long int)i) << 16) | rand();
}
#endif
~~~
因为这里还是用到了系统的rand()函数,z作者完全没有用系统自带的,所以在Redis中这里的实现就略有不同了:
~~~
int32_t redisLrand48() {
next();
return (((int32_t)x[2] << (N - 1)) + (x[1] >> 1));
}
/* 设置种子 */
void redisSrand48(int32_t seedval) {
SEED(X0, LOW(seedval), HIGH(seedval));
}
static void next(void) {
uint32_t p[2], q[2], r[2], carry0, carry1;
MUL(a[0], x[0], p);
ADDEQU(p[0], c, carry0);
ADDEQU(p[1], carry0, carry1);
MUL(a[0], x[1], q);
ADDEQU(p[1], q[0], carry0);
MUL(a[1], x[0], r);
x[2] = LOW(carry0 + carry1 + CARRY(p[1], r[0]) + q[1] + r[1] +
a[0] * x[2] + a[1] * x[1] + a[2] * x[0]);
x[1] = LOW(p[1] + r[0]);
x[0] = LOW(p[0]);
}
~~~
具体的next的实现,参照源代码,各种4则运算的并操作。
';
(二十二)— networking网络协议传输
最后更新于:2022-04-01 20:21:00
上次我只分析了Redis网络部分的代码一部分,今天我把networking的代码实现部分也学习了一遍,netWorking的代码更多偏重的是Client客户端的操作。里面addReply()系列的方法操作是主要的部分。光光这个系列的方法,应该占据了一半的API的数量。我把API分成了3个部分:
~~~
/* ------------ API ---------------------- */
void *dupClientReplyValue(void *o) /* 复制value一份 */
int listMatchObjects(void *a, void *b) /* 比价2个obj是否相等 */
robj *dupLastObjectIfNeeded(list *reply) /* 返回回复列表中最后一个元素对象 */
void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 将源Client的输出buffer复制给目标Client */
static void acceptCommonHandler(int fd, int flags) /* 网络连接后的调用方法 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void disconnectSlaves(void) /* 使server的slave失去连接 */
void replicationHandleMasterDisconnection(void)
void flushSlavesOutputBuffers(void) /* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */
int processEventsWhileBlocked(void)
/* ------------- addReply API ----------------- */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客户端缓冲区中添加内容 */
void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */
void _addReplySdsToList(redisClient *c, sds s) /* 在回复列表中添加Sds字符串对象 */
void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回复列表中添加字符串对象,参数中已经给定字符的长度 */
void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中写入数据,数据存在obj->ptr的指针中 */
void addReplySds(redisClient *c, sds s) /* 在回复中添加Sds字符串,下面的额addReply()系列方法原理基本类似 */
void addReplyString(redisClient *c, char *s, size_t len)
void addReplyErrorLength(redisClient *c, char *s, size_t len)
void addReplyError(redisClient *c, char *err) /* 往Reply中添加error类的信息 */
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
void addReplyStatusLength(redisClient *c, char *s, size_t len)
void addReplyStatus(redisClient *c, char *status)
void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一个空的obj对象 */
void setDeferredMultiBulkLength(redisClient *c, void *node, long length)
void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一个double类型值,bulk的意思为大块的,bulk reply的意思为大数据量的回复 */
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)
void addReplyLongLong(redisClient *c, long long ll)
void addReplyMultiBulkLen(redisClient *c, long length)
void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大块的数据的长度 */
void addReplyBulk(redisClient *c, robj *obj) /* 将一个obj的数据,拆分成大块数据的添加 */
void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)
void addReplyBulkCString(redisClient *c, char *s)
void addReplyBulkLongLong(redisClient *c, long long ll)
/* ------------- Client API ----------------- */
redisClient *createClient(int fd) /* 创建redisClient客户端,1.建立连接,2.设置数据库,3.属性设置 */
int prepareClientToWrite(redisClient *c) /* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */
static void freeClientArgv(redisClient *c)
void freeClient(redisClient *c) /* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */
void freeClientAsync(redisClient *c)
void freeClientsInAsyncFreeQueue(void) /* 异步的free客户端 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 将Client中的reply数据存入文件中 */
void resetClient(redisClient *c)
int processInlineBuffer(redisClient *c) /* 处理redis Client的内链的buffer,就是c->querybuf */
static void setProtocolError(redisClient *c, int pos)
int processMultibulkBuffer(redisClient *c) /* 处理大块的buffer */
void processInputBuffer(redisClient *c) /* 处理redisClient的查询buffer */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 从Client获取查询query语句 */
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer) /* 获取Client中输入buffer和输出buffer的最大长度值 */
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口号的输出,ip:port */
int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 获取Client客户端的ip,port地址信息 */
char *getClientPeerId(redisClient *c) /* 获取c->peerid客户端的地址信息 */
sds catClientInfoString(sds s, redisClient *client) /* 格式化的输出客户端的属性信息,直接返回一个拼接好的字符串 */
sds getAllClientsInfoString(void) /* 获取所有Client客户端的属性信息,并连接成一个总的字符串并输出 */
void clientCommand(redisClient *c) /* 执行客户端的命令的作法 */
void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重写客户端的命令集合,旧的命令集合的应用计数减1,新的Command Vector的命令集合增1 */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重写Client中的第i个参数 */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 获取Client中已经用去的输出buffer的大小 */
int getClientType(redisClient *c)
int getClientTypeByName(char *name) /* Client中的名字的3种类型,normal,slave,pubsub */
char *getClientTypeName(int class)
int checkClientOutputBufferLimits(redisClient *c) /* 判断Clint的输出缓冲区的已经占用大小是否超过软限制或是硬限制 */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */
~~~
我们从最简单的_addReplyToBuffer在缓冲区中添加回复数据开始说起,因为后面的各种addReply的方法都或多或少的调用了和这个歌方法。
~~~
/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
/* 往客户端缓冲区中添加内容 */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
//如果当前的reply已经存在内容,则操作出错
if (listLength(c->reply) > 0) return REDIS_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return REDIS_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return REDIS_OK;
}
~~~
最直接影响的一句话,就是memcpy(c->buf+c->bufpos,s,len);所以内容是加到c->buf中的,这也就是客户端的输出buffer,添加操作还有另外一种形式是添加对象类型:
~~~
/* robj添加到reply的列表中 */
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
incrRefCount(o);
//在回复列表汇总添加robj内容
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
~~~
把robj对象加载reply列表中,并且改变reply的byte大小,最后还调用了一个asyncCloseClientOnOutputBufferLimitReached(c);方法,这个方法我是在这个文件的最底部找到的,一开始还真不知道什么意思,作用就是当添加完数据后,当客户端的输出缓冲的大小超出限制时,会被异步关闭:
~~~
/* Asynchronously close a client if soft or hard limit is reached on the
* output buffer size. The caller can check if the client will be closed
* checking if the client REDIS_CLOSE_ASAP flag is set.
*
* Note: we need to close the client asynchronously because this function is
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
/* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
sdsfree(client);
}
}
~~~
在addReply方法调用的时候,有时是需要一个前提的,我说的是在写数据事件发生的时候,你得先对写的文件创建一个监听事件:
~~~
/* 在回复中添加Sds字符串 */
void addReplySds(redisClient *c, sds s) {
//在调用添加操作之前,都要先执行prepareClientToWrite(c),设置文件事件的写事件
if (prepareClientToWrite(c) != REDIS_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
}
if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
sdsfree(s);
} else {
/* This method free's the sds when it is no longer needed. */
_addReplySdsToList(c,s);
}
}
~~~
在这个prepareClientToWrite()里面是干嘛的呢?
~~~
/* This function is called every time we are going to transmit new data
* to the client. The behavior is the following:
*
* If the client should receive new data (normal clients will) the function
* returns REDIS_OK, and make sure to install the write handler in our event
* loop so that when the socket is writable new data gets written.
*
* If the client should not receive new data, because it is a fake client,
* a master, a slave not yet online, or because the setup of the write handler
* failed, the function returns REDIS_ERR.
*
* Typically gets called every time a reply is built, before adding more
* data to the clients output buffers. If the function returns REDIS_ERR no
* data should be appended to the output buffers. */
/* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */
int prepareClientToWrite(redisClient *c) {
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
if (c->fd <= 0) return REDIS_ERR; /* Fake client */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
//在这里创建写的文件事件
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
~~~
在addReply的方法里提到了一个addReplyBulk类型方法,Bulk的中文意思为大块的,说明addReplyBulk添加的都是一些比较大块的数据,找一个方法看看:
~~~
/* Add a Redis Object as a bulk reply */
/* 将一个obj的数据,拆分成大块数据的添加 */
void addReplyBulk(redisClient *c, robj *obj) {
//reply添加长度
addReplyBulkLen(c,obj);
//reply添加对象
addReply(c,obj);
addReply(c,shared.crlf);
}
~~~
将原本一个robj的数据拆分成可3个普通的addReply的方法调用。就变成了数据量变大了的数据。大数据的回复一个比较不好的地方是到时解析的时候或者是Data的复制的时候会比较耗时。在networking的方法里还提供了freeClient()的操作:
~~~
/* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */
void freeClient(redisClient *c) {
listNode *ln;
/* If this is marked as current client unset it */
if (server.current_client == c) server.current_client = NULL;
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,"Connection with master lost.");
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
//如果是Master客户端,需要做缓存Client的处理,可以迅速重新启用
replicationCacheMaster(c);
return;
}
}
~~~
...后面代码略去了
当Client中的输出buffer数据渐渐变多了的时候就要准备持久化到磁盘文件了,要调用下面这个方法了,
~~~
/* Helper function used by freeMemoryIfNeeded() in order to flush slave
* output buffers without returning control to the event loop. */
/* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */
void flushSlavesOutputBuffers(void) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
int events;
events = aeGetFileEvents(server.el,slave->fd);
if (events & AE_WRITABLE &&
slave->replstate == REDIS_REPL_ONLINE &&
listLength(slave->reply))
{
//在这里调用了write的方法
sendReplyToClient(server.el,slave->fd,slave,0);
}
}
}
~~~
这个方法的核心调用又在sendReplyToClient()方法,就是把Client的reply内容和buf内容存入文件。以上就是我的理解了,代码量有点大,的确看的我头有点大。
';
(二十一)— anet网络通信的封装
最后更新于:2022-04-01 20:20:58
昨天非常轻松的分析完Redis的事件驱动模型之后,今天我来看看anet的代码,anet是Redis对于Client/Server的网络操作的一个小小封装。代码中对此文件的官方解释为:
~~~
/* anet.c -- Basic TCP socket stuff made a bit less boring
* 基于简单的基本TCP的socket连接
~~~
后面的made a bit less boring这在这里表示啥意思,就让我有点费解了,不过前面的是重点,Basic TCP Socket,基于的是TCP 协议的socket连接。anet.h的API如下:
~~~
int anetTcpConnect(char *err, char *addr, int port); /* TCP的默认连接 */
int anetTcpNonBlockConnect(char *err, char *addr, int port); /* TCP的非阻塞连接 */
int anetUnixConnect(char *err, char *path); /* anet的Unix方式的默认连接方式 */
int anetUnixNonBlockConnect(char *err, char *path); /* anet的Unix方式的非阻塞连接方式 */
int anetRead(int fd, char *buf, int count); /* anet网络读取文件到buffer中操作 */
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len); /* 解析所有的东西 */
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len); /* 单单解析IP的地址 */
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);
int anetWrite(int fd, char *buf, int count); /* anet通过网络从buffer中写入文件操作 */
int anetNonBlock(char *err, int fd); /* anet设置非阻塞的方法 */
int anetEnableTcpNoDelay(char *err, int fd); /* 启用TCP没有延迟 */
int anetDisableTcpNoDelay(char *err, int fd); /* 禁用TCP连接没有延迟 */
int anetTcpKeepAlive(char *err, int fd); /* 设置TCP保持活跃连接状态。适用于所有系统 */
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval); /* 设置TCP连接一直存活,用来检测已经死去的结点,interval选项只适用于Linux下的系统 */
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
~~~
我们还是能够看到很多熟悉的方法的,read,write,accept.connect等在任何编程语言中都会看到的一些方法。看完这个anet,最直观的感觉就是作者编写的这个网络操作库就是对于C语言系统网络库的又一次简答的封装,因为里面都是直接调用库方法的函数实现。作者根据自己业务的需要在上面做了小小的封装。比如说非阻塞的设置;
~~~
/* anet设置非阻塞的方法 */
int anetNonBlock(char *err, int fd)
{
int flags;
/* Set the socket non-blocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(fd, F_GETFL)) == -1) {
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
return ANET_ERR;
}
//调用fcntl方法设置非阻塞方法
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
~~~
fcntl方法是直接起作用的方法。在整个网络操作文件的中,让我感觉稍有亮点的还是有一些地方的
(1).能设置BLOCK连接还是NONE_BLOCKED方式的connect;
~~~
/* TCP的默认连接 */
int anetTcpConnect(char *err, char *addr, int port)
{
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);
}
/* TCP的非阻塞连接 */
int anetTcpNonBlockConnect(char *err, char *addr, int port)
{
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);
}
~~~
(2).能设置连接的Delay的延时与否。:
~~~
/* 设置TCP连接是否NODelay没有延迟 */
static int anetSetTcpNoDelay(char *err, int fd, int val)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1)
{
anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
/* 启用TCP没有延迟 */
int anetEnableTcpNoDelay(char *err, int fd)
{
return anetSetTcpNoDelay(err, fd, 1);
}
/* 禁用TCP连接没有延迟 */
int anetDisableTcpNoDelay(char *err, int fd)
{
return anetSetTcpNoDelay(err, fd, 0);
}
~~~
也许在有些情况下对延时要求比较高,就不能有延时。
(3).对ip地址有ipv4和ipv6地址不同的处理方法。这个作者想得还是非常全面的。在对地址做resolve解析的时候就考虑到了这个问题:
~~~
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".
*
* If flags is set to ANET_IP_ONLY the function only resolves hostnames
* that are actually already IPv4 or IPv6 addresses. This turns the function
* into a validating / normalizing function. */
/* 解析的泛型方法,可以根据条件解析host主机名或IP地址 */
int anetGenericResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len,
int flags)
{
struct addrinfo hints, *info;
int rv;
memset(&hints,0,sizeof(hints));
if (flags & ANET_IP_ONLY) hints.ai_flags = AI_NUMERICHOST;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; /* specify socktype to avoid dups */
//解析hostName
if ((rv = getaddrinfo(host, NULL, &hints, &info)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
//根据类型解析ipV4的地址还是ipV6的地址
if (info->ai_family == AF_INET) {
struct sockaddr_in *sa = (struct sockaddr_in *)info->ai_addr;
inet_ntop(AF_INET, &(sa->sin_addr), ipbuf, ipbuf_len);
} else {
struct sockaddr_in6 *sa = (struct sockaddr_in6 *)info->ai_addr;
inet_ntop(AF_INET6, &(sa->sin6_addr), ipbuf, ipbuf_len);
}
freeaddrinfo(info);
return ANET_OK;
}
~~~
还有一些常见的方法,与我们平时写代码时用的手法基本一样,比如accept()的方法:
~~~
/* socket连接操作 */
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
//通过while循环等待连接
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
~~~
';
(二十)— ae事件驱动
最后更新于:2022-04-01 20:20:56
事件驱动这个名词出现的越来越频繁了,听起来非常高大上,今天本人把Redis内部的驱动模型研究了一番,感觉收获颇丰啊。一个ae.c主程序,加上4个事件类型的文件,让你彻底弄清楚,Redis是如何处理这些事件的。在Redis的事件处理中,用到了epoll,select,kqueue和evport,evport可能大家会陌生许多。前面3个都是非常常见的事件,在libevent的事件网络库中也都有出现。作者在写这个事件驱动模型的时候,也说了,这只是为了简单的复用了,设计的一个小小的处理模型:
~~~
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* ae是作者写的一个简单的事件驱动库,后面进行了转化,变得更为简单的复用
~~~
所以不是很复杂。在了解整个事件驱动的模型前,有先了解一些定义的事件结构体,事件类型总共2个一个FileEvent,TimeEvent:
~~~
/* File event structure */
/* 文件事件结构体 */
typedef struct aeFileEvent {
//只为读事件或者写事件中的1种
int mask; /* one of AE_(READABLE|WRITABLE) */
//读方法
aeFileProc *rfileProc;
//写方法
aeFileProc *wfileProc;
//客户端数据
void *clientData;
} aeFileEvent;
/* Time event structure */
/* 时间事件结构体 */
typedef struct aeTimeEvent {
//时间事件id
long long id; /* time event identifier. */
//时间秒数
long when_sec; /* seconds */
//时间毫秒
long when_ms; /* milliseconds */
//时间事件中的处理函数
aeTimeProc *timeProc;
//被删除的时候将会调用的方法
aeEventFinalizerProc *finalizerProc;
//客户端数据
void *clientData;
//时间结构体内的下一个结构体
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
/* fired结构体,用来表示将要被处理的文件事件 */
typedef struct aeFiredEvent {
//文件描述符id
int fd;
int mask;
} aeFiredEvent;
~~~
FireEvent只是用来标记要处理的文件Event。
这些事件都存在于一个aeEventLoop的结构体内:
~~~
/* State of an event based program */
typedef struct aeEventLoop {
//目前创建的最高的文件描述符
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
//下一个时间事件id
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
//3种事件类型
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
//事件停止标志符
int stop;
//这里存放的是event API的数据,包括epoll,select等事件
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
~~~
在每种事件内部,都有定义相应的处理函数,把函数当做变量一样存在结构体中。下面看下ae.c中的一些API的组成:
~~~
/* Prototypes */
aeEventLoop *aeCreateEventLoop(int setsize); /* 创建aeEventLoop,内部的fileEvent和Fired事件的个数为setSize个 */
void aeDeleteEventLoop(aeEventLoop *eventLoop); /* 删除EventLoop,释放相应的事件所占的空间 */
void aeStop(aeEventLoop *eventLoop); /* 设置eventLoop中的停止属性为1 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData); /* 在eventLoop中创建文件事件 */
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); /* 删除文件事件 */
int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //根据文件描述符id,找出文件的属性,是读事件还是写事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc); /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); //根据时间id,删除时间事件,涉及链表的操作
int aeProcessEvents(aeEventLoop *eventLoop, int flags); /* 处理eventLoop中的所有类型事件 */
int aeWait(int fd, int mask, long long milliseconds); /* 让某事件等待 */
void aeMain(aeEventLoop *eventLoop); /* ae事件执行主程序 */
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); /* 每次eventLoop事件执行完后又重新开始执行时调用 */
int aeGetSetSize(aeEventLoop *eventLoop); /* 获取eventLoop的大小 */
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); /* EventLoop重新调整大小 */
~~~
无非涉及一些文件,时间事件的添加,修改等,都是在eventLoop内部的修改,我们来看下最主要,最核心的方法:
~~~
/* ae事件执行主程序 */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//如果eventLoop中的stop标志位不为1,就循环处理
while (!eventLoop->stop) {
//每次eventLoop事件执行完后又重新开始执行时调用
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//while循环处理所有的evetLoop的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
~~~
道理很简单通过,while循环,处理eventLoop中的所有类型事件,截取部分processEvents()代码:
~~~
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
//根据掩码计算判断是否为ae读事件,调用时间中的读的处理方法
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
~~~
ae中创建时间事件都是以当前时间为基准创建的;
~~~
/* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
//新加的变为timeEvent的头部
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
//返回新创建的时间事件的id
return id;
}
~~~
下面说说如何调用事件API库里的方法呢。首先隆重介绍什么是epoll,poll,select,kqueu和evport。这些都是一种事件模型。
select事件的模型
(1)创建所关注的事件的描述符集合(fd_set),对于一个描述符,可以关注其上面的读(read)、写(write)、异常(exception)事件,所以通常,要创建三个fd_set, 一个用来收集关注读事件的描述符,一个用来收集关注写事件的描述符,另外一个用来收集关注 异常事件的描述符集合。
(2)轮询所有fd_set中的每一个fd ,检查是否有相应的事件发生,如果有,就进行处理。
poll和上面的区别是可以复用文件描述符,上面对一个文件需要轮询3个文件描述符集合,而poll只需要一个,效率更高
epoll是poll的升级版本,把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。效率极大提高
evport这个出现的比较少,大致意思是evport将某一个对象的特定 event 与 Event port 相关联:
在了解了3种事件模型的原理之后,我们看看ae.c在Redis中是如何调用的呢,
~~~
//这里存放的是event API的数据,包括epoll,select等事件
void *apidata; /* This is used for polling API specific data */
~~~
就是上面这个属性,在上面的4种事件中,分别对应着3个文件,分别为ae_poll.c,ae_select.c,但是他们的API结构是类似的,我举其中一个例子,epoll的例子,首先都会有此事件特定的结构体:
~~~
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
~~~
还有共同套路的模板方法:
~~~
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
static char *aeApiName(void)
~~~
在创建的时候赋值到eventloop的API data里面去:
~~~
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
//最后将state的数据赋值到eventLoop的API data中
eventLoop->apidata = state;
return 0;
~~~
在取出事件的poll方法的时候是这些方法的一个区分点:
~~~
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
.....
~~~
而在select中的poll方法是这样的:
~~~
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
......
~~~
最后都是基于state中的事件和eventLoop之间的转化实现操作。传入eventLoop中的信息,传入state的信息,经过内部的处理得出终的事件结果。调用就这么简单。
';
(十九)— replication主从数据复制的实现
最后更新于:2022-04-01 20:20:54
replication的英文单词的原意是“复制”的意思,replication文件作为我在Data目录下的分析的最后一个文件,足以说明他的重要性,代码量1800+,的确非常难啃。只能说个我看代码下来的大致印象吧,要我画个结构图好好理理这里面各个API的关系图,这个我目前还真做不到。说到主从复制,这个是实现读写分离的最好手段了,也很常见,当用户数达到一定量,当一个服务器承受不了达到上千万的pv时,采取主从数据库的形式也是一般架构师能够想到的一种手段。Redis的主从数据库在我这里就称为主客户端,从客户端,因为客户端中有所属于的db,因为数据库基于客户单本身进行复制操作的。也就是说,一个Redis,存在一个master主客户端,多个slave从客户端,到时实现的就是slave向主客户端进行复制操作。因为API比较多,进行了稍稍的归类:
~~~
/* ---------------------------------- MASTER -------------------------------- */
void createReplicationBacklog(void) /* 创建backlog的buffer */
void resizeReplicationBacklog(long long newsize) /* 调整复制备份日志的大小,当replication backlog被修改的时候 */
void freeReplicationBacklog(void) /* 释放备份日志 */
void feedReplicationBacklog(void *ptr, size_t len) /* 往备份日志中添加添加数据操作,会引起master_repl_offset偏移量的增加 */
void feedReplicationBacklogWithObject(robj *o) /* 往backlog添加数据,以Redis 字符串对象作为参数 */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) /* 将主数据库复制到从数据库 */
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) /* 发送数据给monitor监听者客户端 */
long long addReplyReplicationBacklog(redisClient *c, long long offset) /* slave从客户单添加备份日志 */
int masterTryPartialResynchronization(redisClient *c) /* 主数据库尝试分区同步 */
void syncCommand(redisClient *c) /* 同步命令函数 */
void replconfCommand(redisClient *c) /* 此函数用于从客户端进行配置复制进程中的执行参数设置 */
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) /* 给slave客户端发送BULK数据 */
void updateSlavesWaitingBgsave(int bgsaveerr) /* 此方法将用于后台保存进程快结束时调用,更新slave从客户端 */
/* ----------------------------------- SLAVE -------------------------------- */
void replicationAbortSyncTransfer(void) /* 中止与master主数据的同步操作 */
void replicationSendNewlineToMaster(void) /* 从客户端发送空行给主客户端,破坏了原本的协议格式,避免让主客户端检测出从客户端超时的情况 */
void replicationEmptyDbCallback(void *privdata) /* 清空数据库后的回调方法,当老数据被刷新出去之后等待加载新数据的时候调用 */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) /* 从客户端读取同步的Sync的BULK数据 */
char *sendSynchronousCommand(int fd, ...) /* 从客户端发送给主客户端同步数据的命令,附上验证信息,和一些参数配置信息 */
int slaveTryPartialResynchronization(int fd) /* 从客户端尝试分区同步操作 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) /* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */
int connectWithMaster(void) /* 连接主客户端 */
void undoConnectWithMaster(void) /* 撤销连接主客户端 */
int cancelReplicationHandshake(void) /* 当已经存在一个复制进程时,中止一个非阻塞的replication复制的尝试 */
void replicationSetMaster(char *ip, int port) /* 设定主客户端的ip地址和端口号 */
void replicationUnsetMaster(void)
void slaveofCommand(redisClient *c)
void roleCommand(redisClient *c)
void replicationSendAck(void) /* 发送ACK包给主客户端 ,告知当前的进程偏移量 */
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
void replicationCacheMaster(redisClient *c) /* 缓存客户端信息 */
void replicationDiscardCachedMaster(void) /* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */
void replicationResurrectCachedMaster(int newfd) /* 将缓存客户端复活 */
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
void refreshGoodSlavesCount(void) /* 更新slave从客户端数量 */
void replicationScriptCacheInit(void)
void replicationScriptCacheFlush(void)
void replicationScriptCacheAdd(sds sha1)
int replicationScriptCacheExists(sds sha1)
void replicationCron(void)
~~~
找一个标准的slave从客户端向主客户端实现同步的操作:
~~~
/* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
/* Check for errors in the socket. */
/* socket连接是否正常 */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds (in the meantime the slave would block). */
/* 连接测试,将由主客户端发送PING命令给从客户端,在给定的延迟时间内观察是否有回复 */
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
//发送PING命令
syncWrite(fd,"PING\r\n",6,100);
return;
}
/* Receive the PONG command. */
//收到回复了
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read. */
aeDeleteFileEvent(server.el,fd,AE_READABLE);
/* Read the reply with explicit timeout. */
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (buf[0] != '+' &&
strncmp(buf,"-NOAUTH",7) != 0 &&
strncmp(buf,"-ERR operation not permitted",28) != 0)
{
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
}
/* AUTH with the master if required. */
//auth身份验证
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
/* 设置从客户端监听端口 */
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
goto error;
}
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
~~~
在replication中,要一个cacheMaster的概念,就是可以临时缓存主客户端的信息,一般用于突然master和slave断开连接的时候,可以下次进行主从同步的时候快速恢复:
~~~
/* 缓存客户端信息 */
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
/* Remove from the list of clients, we don't want this client to be
* listed by CLIENT LIST or processed in any way by batch operations. */
//首先移除此客户端
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
//保存为缓存客户端
server.cached_master = server.master;
/* Remove the event handlers and close the socket. We'll later reuse
* the socket of the new connection with the master during PSYNC. */
//删除在这个客户端上的读写事件
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c->fd = -1;
/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
~~~
当想让这个master的复活的时候,调用下面的方法:
~~~
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This funciton is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from were this
* master left. */
/* 将缓存客户端复活 */
void replicationResurrectCachedMaster(int newfd) {
//将cached_master赋值为主客户端
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REDIS_REPL_CONNECTED;
/* Re-add to the list of clients. */
//重新添加入客户端列表中
listAddNodeTail(server.clients,server.master);
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (server.master->bufpos || listLength(server.master->reply)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}
~~~
当然如果确定在未来不糊在使用缓存的master的时,可以彻底摧毁:
~~~
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
/* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;
redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~REDIS_MASTER;
//直接释放客户端
freeClient(server.cached_master);
//server的缓存客户端赋值为NULL
server.cached_master = NULL;
}
~~~
在这里面靠的就是server.cached_master属性。slave在和master连接的时候,要进行master的ip地址和Port端口的确认:
~~~
/* Set replication to the specified master address and port. */
/* 设定主客户端的ip地址和端口号 */
void replicationSetMaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
//设置完毕之后,断开所有的连接,中止replication进程
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
}
~~~
主从复制的实现其实还有很多细节和步骤的。稍稍分析了一下,以后有机会研究的更深入一点
';
(十八)— db.c内存数据库操作
最后更新于:2022-04-01 20:20:51
我们知道Redis数据库作为一个内存数据库,与memcached比较类似,基本的操作都是存储在内存缓冲区中,等到缓冲区中数据满后,在持久化到磁盘中。今天,我主要研究了对于redis中对于内存数据库的操作。与普通的数据操作比较,并没有什么特别多的其他的一些操作。下面是我分类出的一些API:
~~~
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
robj *lookupKey(redisDb *db, robj *key) /* 从db中获取key代表的值 */
robj *lookupKeyRead(redisDb *db, robj *key) /* 寻找某个key的值,与lookupKey方法的区别是多了过期检查 */
robj *lookupKeyWrite(redisDb *db, robj *key) /* 与lookupKeyRead一样,只是少了命中刷的统计 */
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) /* 有回复的读擦操作 */
robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) /* 有回复的写操作 */
void dbAdd(redisDb *db, robj *key, robj *val) /* 往内存数据库中添加值,如果key已经存在,则操作无效 */
void dbOverwrite(redisDb *db, robj *key, robj *val) /* db key value覆盖操作,如果不存在此key,操作失效 */
void setKey(redisDb *db, robj *key, robj *val) /* 高级设置操作,如果不存在的直接添加,存在的就覆盖 */
int dbExists(redisDb *db, robj *key) /* db是否存在此key */
robj *dbRandomKey(redisDb *db) /* 随机返回没有过期的key */
int dbDelete(redisDb *db, robj *key) /* db删除操作 */
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) /* 解除key的共享,之后就可以进行修改操作 */
long long emptyDb(void(callback)(void*)) /* 将server中的所有数据库清空,回调函数作为参数传入 */
int selectDb(redisClient *c, int id) /* 客户端选择服务端的某个db */
void signalModifiedKey(redisDb *db, robj *key) /* 每当key被修改时,就会调用此方法,touchWatchedKey(db,key)方法,就把此key对应的客户端锁住了 */
void signalFlushedDb(int dbid) /* 把dbid中的key都touch一遍 */
void flushdbCommand(redisClient *c) /* 刷新client所在的db命令 */
void flushallCommand(redisClient *c) /* 刷新所有的server中的数据库 */
void delCommand(redisClient *c) /* 根据Client的命令参数删除数据库 */
void existsCommand(redisClient *c) /* 某个key是否存在命令 */
void selectCommand(redisClient *c) /* Client客户端选择数据库命令 */
void randomkeyCommand(redisClient *c) /* 获取随机key指令 */
void keysCommand(redisClient *c) /* 向客户端回复key obj命令 */
void scanCallback(void *privdata, const dictEntry *de) /* type scan扫描出key,val */
int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor) /* 判断scan Cursor是否有效 */
void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor) /* 3: Filter elements.(过滤元素)4: Reply to the client.(回复客户端) */
void scanCommand(redisClient *c) /* 扫描命令 */
void dbsizeCommand(redisClient *c) /* 客户端所用的db的字典总数 */
void lastsaveCommand(redisClient *c) /* 服务端最后一次保存的操作 */
void typeCommand(redisClient *c) /* 客户端查询的key的type类型 */
void shutdownCommand(redisClient *c) /* shutdown终止命令,服务端要做最后的保存操作 */
void renameGenericCommand(redisClient *c, int nx) /*为key重命名操作 */
void renameCommand(redisClient *c) /* 重命名可能会覆盖原值命令 */
void renamenxCommand(redisClient *c) /* 重命名时不覆盖原来的值 */
void moveCommand(redisClient *c) /* 将源db中的key移到目标db上 */
int removeExpire(redisDb *db, robj *key) /* 移除过期的key */
void setExpire(redisDb *db, robj *key, long long when) /* 设置过期的key,操作为将主要的dict中key移入expire的dict中,并对此key设置时间 */
long long getExpire(redisDb *db, robj *key) /* 获取key的过期时间*/
void propagateExpire(redisDb *db, robj *key)
int expireIfNeeded(redisDb *db, robj *key) /* 判断此key是否过期,2个条件,1是否存在expire的key里没有就不过期
2.在expire里面了,判断when时间有没有超过当前时间,没有超过也不算过期 */
void expireGenericCommand(redisClient *c, long long basetime, int unit)
void expireCommand(redisClient *c)
void expireatCommand(redisClient *c)
void pexpireCommand(redisClient *c)
void pexpireatCommand(redisClient *c)
void ttlGenericCommand(redisClient *c, int output_ms) /* 返回key的ttl生存时间 ,下面的一些方法是时间单位的不同,默认为秒*/
void ttlCommand(redisClient *c)
void pttlCommand(redisClient *c)
void persistCommand(redisClient *c) /* key是否存在的命令 */
int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys)
int *getKeysFromCommand(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags)
void getKeysFreeResult(int *result)
int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags)
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags)
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags)
~~~
在API的后半部分API都是一些函数封装的一些命令操作。开放给系统调用。在上面的API中,比较典型的就是,read,write等API,
~~~
/* 从db中获取key代表的值 */
robj *lookupKey(redisDb *db, robj *key) {
//从db的dict字典中查找
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
val->lru = server.lruclock;
return val;
} else {
return NULL;
}
}
~~~
但是真正调用的时候,不会直接调用此方法,会加一些限制,会过滤掉过期的key,还有缓冲区命中数的统计:
~~~
/* 寻找某个key的值,与lookupKey方法的区别是多了过期检查 */
robj *lookupKeyRead(redisDb *db, robj *key) {
robj *val;
expireIfNeeded(db,key);
val = lookupKey(db,key);
if (val == NULL)
//命中数减一
server.stat_keyspace_misses++;
else
//命中数递增1
server.stat_keyspace_hits++;
return val;
}
~~~
可以有效调整缓冲区。下面给出一个修改内存数据库的操作:
~~~
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent). */
/* 高级设置操作,如果不存在的直接添加,存在的就覆盖 */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
//对此key增加引用计数
incrRefCount(val);
removeExpire(db,key);
signalModifiedKey(db,key);
}
~~~
我们看到其实在每次更改数据库操作的时候,都会出现signalModifiedKey(db,key)这个方法,大致意思就是提示要改变key所对应的值了,里面执行的操作到底是什么呢,这个方法的实现就在db.c中:
~~~
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
/* 每当key被修改时,就会调用此方法,touchWatchedKey(db,key)方法,就把此key对应的客户端锁住了 */
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}
~~~
调用的就是touch -key方法了,就是把监听此key的Client列表进行设置,只能让一个客户端操作执行成功,客户端的其他操作无效,达到同步。当内存数据渐渐满的时候,会定期的刷新到磁盘中:
~~~
/* 刷新所有的server中的数据库 */
void flushallCommand(redisClient *c) {
signalFlushedDb(-1);
server.dirty += emptyDb(NULL);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) {
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
//在这里重新保存rdb了
rdbSave(server.rdb_filename);
server.dirty = saved_dirty;
}
server.dirty++;
}
~~~
rdbSave操作时重点。在db.c还提到了一个概念,expire过期的概念,也就是说,存在key过期的概念,在内存数据库中,频繁的操作比如会引起许多过期的键值对的存在,所以在db中,维护了一个db->expires的东西,所有过期的可以都存在于db->expires里面,定期会进行移除操作,所以在最早的那个函数中,往内存数据库取值的时候,要判断是否过期
~~~
/* 判断此key是否过期,2个条件,1是否存在expire的key里没有就不过期
2.在expire里面了,判断when时间有没有超过当前时间,没有超过也不算过期 */
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
/* If we are in the context of a Lua script, we claim that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;
/* Return when this key has not expired */
if (now <= when) return 0;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",key,db->id);
return dbDelete(db,key);
}
~~~
每个expire的key有个ttl的概念,就是"Time To Live"生存时间:
~~~
/* 返回key的ttl生存时间 */
void ttlGenericCommand(redisClient *c, int output_ms) {
long long expire, ttl = -1;
/* If the key does not exist at all, return -2 */
if (lookupKeyRead(c->db,c->argv[1]) == NULL) {
addReplyLongLong(c,-2);
return;
}
/* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */
expire = getExpire(c->db,c->argv[1]);
if (expire != -1) {
//如果已被移入过期的key,计算过期时间里当前时间还差多远,ttl就是当前的生存时间单位为ms
ttl = expire-mstime();
if (ttl < 0) ttl = 0;
}
if (ttl == -1) {
addReplyLongLong(c,-1);
} else {
addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
}
}
~~~
用来判断是否过期的时候用。
';
(十七)— multi事务操作
最后更新于:2022-04-01 20:20:49
redis作为一非关系型数据库,竟然同样拥有与RDBMS的事务操作,不免让我觉得比较惊讶。在redis就专门有文件就是执行事务的相关操作的。也可以让我们领略一下,在Redis的代码中是如何实现事务操作。首先亮出mulic.c下面的一些API。
~~~
/* ================================ MULTI/EXEC ============================== */
void initClientMultiState(redisClient *c) /* 初始化客户端操作 */
void freeClientMultiState(redisClient *c) /* 释放客户端所有与multi/exec相关的资源 */
void queueMultiCommand(redisClient *c) /* 客户端的multi命令队列添加一条新的命令 */
void discardTransaction(redisClient *c) /* 撤销事务操作 */
void flagTransaction(redisClient *c) /* 标记一个事物为DIRTY_EXEC状态,最后这个事物会执行失败,,此方法调用于插入命令的时候 */
void multiCommand(redisClient *c) /* 加入multi命令 */
void discardCommand(redisClient *c) /* 撤销命令 */
void execCommandPropagateMulti(redisClient *c) /* 发送multi命令给所有的从客户端和aof文件 */
void execCommand(redisClient *c) /* 客户单执行Command命令 */
void watchForKey(redisClient *c, robj *key) /* 为客户端添加key监听 */
void unwatchAllKeys(redisClient *c) /* 客户端移除所有的key */
void touchWatchedKey(redisDb *db, robj *key) /* touch key的意思,表示key正在被监听,下一条执行操作将会失败 */
void touchWatchedKeysOnFlush(int dbid) /* 根据key所在的的db,把此db下的watched-key统统touch一遍 */
void watchCommand(redisClient *c) /* watch key 的命令方法,通过client中的参数传值 */
void unwatchCommand(redisClient *c) /* 取消监听key的命令方法 */
~~~
方法不是很多,但是里面出现了一个出现频率很高的词"key"。这个key在这里的确是起到了关键的作用。在muli的代码中主要包含了一些,加入命令,执行命令,还有一些撤销指令的操作,比如下面的撤销事务的操作。
~~~
/* 撤销事务 */
void discardTransaction(redisClient *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
//客户端取消监听所有的key
unwatchAllKeys(c);
}
~~~
里面有个unwatchAllKeys()的方法。下面是事务操作的关键原理了:
~~~
/* 在事务处理中,存在2种mapping映射,key-->client lists ,表示所有列表中的Client都在监听这个key
,当这个key的value发生改变了,可以标记这些Client为DIRTY状态,需要更新了,同时在Client内部也会维护
一个key of list,表示一个客户端所监视的所有key,当Client发生free操作等,就要把key里面维护的Client列表
做更新*/
~~~
~~~
/* touch key的意思,表示key正在被监听,下一条执行操作将会失败 */
~~~
也就是说,正在客户端正在监听的key,他的下一步命令将会执行失败,达到了同步的效果,
~~~
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
/* touch key的意思,表示key正在被监听,下一条执行操作将会失败 */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
//遍历该key拥有的Client,把flag标记为DIRTY_CAS状态
c->flags |= REDIS_DIRTY_CAS;
}
}
~~~
当客户端尝试用touch的方法去监听key的时候,Client的flag状态呗改为了DIRTY_CAS,不禁让我猜测,同步的方法是用CAS算法嘛,如果很多客户端都在用此算法,的确挺耗CPU的哦。总的来说,key维护了一个Client列表,一个Client同样拥有它所有watch的key列表,key的结构体很简单:
~~~
/* 定义了watchedKey结构体 */
typedef struct watchedKey {
robj *key;
redisDb *db;
} watchedKey;
~~~
key包含了它所属于的哪个数据库,所以刚刚撤销事务的操作,就要把客户端所监听的key都给移除掉了。
';