(二十七)— rio系统I/O的封装
最后更新于:2022-04-01 20:21:12
I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:
~~~
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
/* 数据流的读方法 */
size_t (*read)(struct _rio *, void *buf, size_t len);
/* 数据流的写方法 */
size_t (*write)(struct _rio *, const void *buf, size_t len);
/* 获取当前的读写偏移量 */
off_t (*tell)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
/* 当读入新的数据块的时候,会更新当前的校验和 */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
/* 当前的校验和 */
uint64_t cksum;
/* number of bytes read or written */
/* 当前读取的或写入的字节大小 */
size_t processed_bytes;
/* maximum single read or write chunk size */
/* 最大的单次读写的大小 */
size_t max_processing_chunk;
/* Backend-specific vars. */
/* rio中I/O变量 */
union {
//buffer结构体
struct {
//buffer具体内容
sds ptr;
//偏移量
off_t pos;
} buffer;
//文件结构体
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
//同步的最小大小
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};
~~~
里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法:
~~~
/* The following functions are our interface with the stream. They'll call the
* actual implementation of read / write / tell, and will update the checksum
* if needed. */
/* rio的写方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
//判断当前操作字节长度是否超过最大长度
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//写入新的数据时,更新校验和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
//执行写方法
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
//操作字节数增加
r->processed_bytes += bytes_to_write;
}
return 1;
}
/* rio的读方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
while (len) {
//判断当前操作字节长度是否超过最大长度
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//读数据方法
if (r->read(r,buf,bytes_to_read) == 0)
return 0;
//读数据时,更新校验和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
}
return 1;
}
~~~
这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:
~~~
/* 根据上面描述的方法,定义了BufferRio */
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
/* 根据上面描述的方法,定义了FileRio */
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
~~~
里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法:
~~~
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的buffer内容到传入的参数 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
r->io.buffer.pos += len;
return 1;
}
~~~
~~~
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的fp文件内容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp);
}
~~~
作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:
~~~
/* rio写入不同类型数据方法,最终调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
~~~
举其中的一个方法实现:
~~~
/* Write multi bulk count in the format: "*\r\n". */
/* rio写入不同类型数据方法,调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
char cbuf[128];
int clen;
cbuf[0] = prefix;
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (rioWrite(r,cbuf,clen) == 0) return 0;
return clen;
}
~~~
调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。
~~~
/* Returns 1 or 0 for success/failure. */
/* 将buf写入rio中的file文件中 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
//判读是否需要同步
fflush(r->io.file.fp);
aof_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
~~~
';