您现在的位置是:网站首页> 编程资料编程资料

Redis源码与设计剖析之网络连接库_Redis_

2023-05-27 532人已围观

简介 Redis源码与设计剖析之网络连接库_Redis_

Redis 网络连接库分析

1. Redis网络连接库介绍

Redis网络连接库对应的文件是networking.c,这个文件主要负责:

  • 客户端的创建与释放.
  • 命令接收与命令回复.
  • Redis通信协议分析.
  • CLIENT 命令的实现.

2. 客户端的创建与释放

2.1 客户端的创建

Redis服务器是一个同时与多个客户端建立连接的程序. 当客户端连接上服务器时,服务器会建立一个server.h/client结构来保存客户端的状态信息. server.h/client结构如下所示:

typedef struct client { // client独一无二的ID uint64_t id; /* Client incremental unique ID. */ // client的套接字 int fd; /* Client socket. */ // 指向当前的数据库 redisDb *db; /* Pointer to currently SELECTed DB. */ // 保存指向数据库的ID int dictid; /* ID of the currently SELECTed DB. */ // client的名字 robj *name; /* As set by CLIENT SETNAME. */ // 输入缓冲区 sds querybuf; /* Buffer we use to accumulate client queries. */ // 输入缓存的峰值 size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ // client输入命令时,参数的数量 int argc; /* Num of arguments of current command. */ // client输入命令的参数列表 robj **argv; /* Arguments of current command. */ // 保存客户端执行命令的历史记录 struct redisCommand *cmd, *lastcmd; /* Last command executed. */ // 请求协议类型,内联或者多条命令 int reqtype; /* Request protocol type: PROTO_REQ_* */ // 参数列表中未读取命令参数的数量,读取一个,该值减1 int multibulklen; /* Number of multi bulk arguments left to read. */ // 命令内容的长度 long bulklen; /* Length of bulk argument in multi bulk request. */ // 回复缓存列表,用于发送大于固定回复缓冲区的回复 list *reply; /* List of reply objects to send to the client. */ // 回复缓存列表对象的总字节数 unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ // 已发送的字节数或对象的字节数 size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ // client创建所需时间 time_t ctime; /* Client creation time. */ // 最后一次和服务器交互的时间 time_t lastinteraction; /* Time of the last interaction, used for timeout */ // 客户端的输出缓冲区超过软性限制的时间,记录输出缓冲区第一次到达软性限制的时间 time_t obuf_soft_limit_reached_time; // client状态的标志 int flags; /* Client flags: CLIENT_* macros. */ // 认证标志,0表示未认证,1表示已认证 int authenticated; /* When requirepass is non-NULL. */ // 从节点的复制状态 int replstate; /* Replication state if this is a slave. */ // 在ack上设置从节点的写处理器,是否在slave向master发送ack, int repl_put_online_on_ack; /* Install slave write handler on ACK. */ // 保存主服务器传来的RDB文件的文件描述符 int repldbfd; /* Replication DB file descriptor. */ // 读取主服务器传来的RDB文件的偏移量 off_t repldboff; /* Replication DB file offset. */ // 主服务器传来的RDB文件的大小 off_t repldbsize; /* Replication DB file size. */ // 主服务器传来的RDB文件的大小,符合协议的字符串形式 sds replpreamble; /* Replication DB preamble. */ // replication复制的偏移量 long long reploff; /* Replication offset if this is our master. */ // 通过ack命令接收到的偏移量 long long repl_ack_off; /* Replication ack offset, if this is a slave. */ // 通过ack命令接收到的偏移量所用的时间 long long repl_ack_time;/* Replication ack time, if this is a slave. */ // FULLRESYNC回复给从节点的offset long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */ // 从节点的端口号 int slave_listening_port; /* As configured with: REPLCONF listening-port */ // 从节点IP地址 char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ // 从节点的功能 int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ // 事物状态 multiState mstate; /* MULTI/EXEC state */ // 阻塞类型 int btype; /* Type of blocking op if CLIENT_BLOCKED. */ // 阻塞的状态 blockingState bpop; /* blocking state */ // 最近一个写全局的复制偏移量 long long woff; /* Last write global replication offset. */ // 监控列表 list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ // 订阅频道 dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ // 订阅的模式 list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ // 被缓存的ID sds peerid; /* Cached peer ID. */ /* Response buffer */ // 回复固定缓冲区的偏移量 int bufpos; // 回复固定缓冲区 char buf[PROTO_REPLY_CHUNK_BYTES]; } client; 

创建客户端的源码:

// 创建一个新的client client *createClient(int fd) { client *c = zmalloc(sizeof(client)); //分配空间 // 如果fd为-1,表示创建的是一个无网络连接的伪客户端,用于执行lua脚本的时候 // 如果fd不等于-1,表示创建一个有网络连接的客户端 if (fd != -1) { // 设置fd为非阻塞模式 anetNonBlock(NULL,fd); // 禁止使用 Nagle 算法,client向内核递交的每个数据包都会立即发送给server出去,TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); // 如果开启了tcpkeepalive,则设置 SO_KEEPALIVE if (server.tcpkeepalive) // 设置tcp连接的keep alive选项 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 创建一个文件事件状态el,且监听读事件,开始接受命令的输入 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // 默认选0号数据库 selectDb(c,0); // 设置client的ID c->id = server.next_client_id++; // client的套接字 c->fd = fd; // client的名字 c->name = NULL; // 回复固定(静态)缓冲区的偏移量 c->bufpos = 0; // 输入缓存区 c->querybuf = sdsempty(); // 输入缓存区的峰值 c->querybuf_peak = 0; // 请求协议类型,内联或者多条命令,初始化为0 c->reqtype = 0; // 参数个数 c->argc = 0; // 参数列表 c->argv = NULL; // 当前执行的命令和最近一次执行的命令 c->cmd = c->lastcmd = NULL; // 查询缓冲区剩余未读取命令的数量 c->multibulklen = 0; // 读入参数的长度 c->bulklen = -1; // 已发的字节数 c->sentlen = 0; // client的状态 c->flags = 0; // 设置创建client的时间和最后一次互动的时间 c->ctime = c->lastinteraction = server.unixtime; // 认证状态 c->authenticated = 0; // replication复制的状态,初始为无 c->replstate = REPL_STATE_NONE; // 设置从节点的写处理器为ack,是否在slave向master发送ack c->repl_put_online_on_ack = 0; // replication复制的偏移量 c->reploff = 0; // 通过ack命令接收到的偏移量 c->repl_ack_off = 0; // 通过ack命令接收到的偏移量所用的时间 c->repl_ack_time = 0; // 从节点的端口号 c->slave_listening_port = 0; // 从节点IP地址 c->slave_ip[0] = '\0'; // 从节点的功能 c->slave_capa = SLAVE_CAPA_NONE; // 回复链表 c->reply = listCreate(); // 回复链表的字节数 c->reply_bytes = 0; // 回复缓冲区的内存大小软限制 c->obuf_soft_limit_reached_time = 0; // 回复链表的释放和复制方法 listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); // 阻塞类型 c->btype = BLOCKED_NONE; // 阻塞超过时间 c->bpop.timeout = 0; // 造成阻塞的键字典 c->bpop.keys = dictCreate(&setDictType,NULL); // 存储解除阻塞的键,用于保存PUSH入元素的键,也就是dstkey c->bpop.target = NULL; // 阻塞状态 c->bpop.numreplicas = 0; // 要达到的复制偏移量 c->bpop.reploffset = 0; // 全局的复制偏移量 c->woff = 0; // 监控的键 c->watched_keys = listCreate(); // 订阅频道 c->pubsub_channels = dictCreate(&setDictType,NULL); // 订阅模式 c->pubsub_patterns = listCreate(); // 被缓存的peerid,peerid就是 ip:port c->peerid = NULL; // 订阅发布模式的释放和比较方法 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); // 将真正的client放在服务器的客户端链表中 if (fd != -1) listAddNodeTail(server.clients,c); // 初始化client的事物状态 initClientMultiState(c); return c; } 

根据创建的文件描述符fd,可以创建用于不同场景下的client. 这个fd就是服务器接收客户端connect后所返回的文件描述符.

  • fd == -1,表示创建一个无网络连接的客户端。主要用于执行 lua 脚本时.
  • fd != -1,表示接收到一个正常的客户端连接,则会创建一个有网络连接的客户端,也就是创建一个文件事件,来监听这个fd是否可读,当客户端发送数据,则事件被触发.

创建客户端的过程,会将server.h/client结构的所有成员初始化,接下里会介绍部分重点的成员.

int id:服务器对于每一个连接进来的都会创建一个ID,客户端的ID从1开始。每次重启服务器会刷新. int fd:当前客户端状态描述符。分为无网络连接的客户端和有网络连接的客户端. int flags:客户端状态的标志. robj *name:默认创建的客户端是没有名字的,可以通过CLIENT SETNAME命令设置名字. 后面会介绍该命令的实现. int reqtype:请求协议的类型. 因为Redis服务器支持Telnet的连接,因此Telnet命令请求协议类型是PROTO_REQ_INLINE,而redis-cli命令请求的协议类型是PROTO_REQ_MULTIBULK.

用于保存服务器接受客户端命令的成员:

sds querybuf:保存客户端发来命令请求的输入缓冲区. 以Redis通信协议的方式保存. size_t querybuf_peak:保存输入缓冲区的峰值. int argc:命令参数个数. robj *argv:命令参数列表.

用于保存服务器给客户端回复的成员:

char buf[16*1024]:保存执行完命令所得命令回复信息的静态缓冲区,它的大小是固定的,所以主要保存的是一些比较短的回复. 分配client结构空间时,就会分配一个16K的大小. int bufpos:记录静态缓冲区的偏移量,也就是buf数组已经使用的字节数. list *reply:保存命令回复的链表. 因为静态缓冲区大小固定,主要保存固定长度的命令回复,当处理一些返回大量回复的命令,则会将命令回复以链表的形式连接起来. unsigned long long reply_bytes:保存回复链表的字节数. size_t sentlen:已发送回复的字节数.

2.2 客户端的释放

客户端释放的函数是freeClient(),主要就是释放各种数据结构和清空一些缓冲区等操作,这里就不再列出源码.

我们可以重点关注一下异步释放客户端,源码如下:

// 异步释放client void freeClientAsync(client *c) { // 如果是已经即将关闭或者是lua脚本的伪client,则直接返回 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; // 将client加入到即将关闭的client链表中 // server.clients_to_close 中保存着服务器中所有待关闭的链表 listAddNodeTail(server.clients_to_close,c); } 

设置异步释放客户端的目的主要是:防止底层函数正在向客户端的输出缓冲区写数据的时候,关闭客户端,这样是不安全的. Redis会安排客户端在serverCron()函数的安全时间释放它.

当然也可以取消异步释放,那么就会调用freeClient()函数立即释放,源码如下:

// 取消设置异步释放的client void freeClientsInAsyncFreeQueue(void) { // 遍历所有即将关闭的client while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); // 取消立即关闭的标志 c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); // 从即将关闭的client链表中删除 listDelNode(server.clients_to_close,ln); } } 

3. 命令接收与命令回复

3.1 命令接收

当客户端连接上Redis服务器后,服务器会得到一个文件描述符fd提示: 本文由整理自网络,如有侵权请联系本站删除!
本站声明:
1、本站所有资源均来源于互联网,不保证100%完整、不提供任何技术支持;
2、本站所发布的文章以及附件仅限用于学习和研究目的;不得将用于商业或者非法用途;否则由此产生的法律后果,本站概不负责!

-六神源码网