当前位置:网站首页>redis 事件

redis 事件

2022-08-10 18:26:00 芒骁

https://zhuanlan.zhihu.com/p/97309908

Redis服务器是一个事件驱动程序,服务器需要处理以下两类事件:

*文件事件(file event):Redis服务器通过套接字与客户端(或者其他Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象。服务器与客户端(或者其他服务器)的通信会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作。

*时间事件(time event):Redis服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象。

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler):

*文件事件处理器使用I/O多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。

*当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。虽然文件事件处理器以单线程方式运行,但通过使用I/O多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型,又可以很好地与Redis服务器中其他同样以单线程方式运行的模块进行对接,这保持了Redis内部单线程

文件事件创建 aeCreateFileEvent

在这里插入图片描述

文件事件定义

在这里插入图片描述

  • mask: 可选值为 AE_READABLE 可读事件, AE_WRITABLE 可写事件, AE_BARRIER 字面意为 屏障事件, 其中可读可写事件比较好理解, AE_BARRIER 事件会影响事件的读写处理顺序为先写后读, 比如beforesleep回调中进行了fsync动作,然后需要把结果快速回复给client。这个情况下就需要用到AE_BARRIER事件,用来翻转处理事件顺序了。
  • rfileProc: 读事件处理器,例如在 server.c 中创建文件事件使用的: aeCreateFileEvent() 方法。
  • wfileProc: 写事件处理器, 如: redisAeWriteEvent() 方法。
  • clientData: 不同的多路复用方式特殊值。

作者:cooper_li
链接:https://juejin.cn/post/6986639765210660871
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

mask 参数值定义

在这里插入图片描述

I/O多路复用程序的实现

Redis在I/O多路复用程序的实现源码中用#include宏定义了相应的规则,程序会在编译时自动选择系统中性能最高的I/O多路复用函数库来作为Redis的I/O多路复用程序的底层实现:

事件的类型

I/O多路复用程序可以监听多个套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,这两类事件和套接字操作之间的对应关系如下:*当套接字变得可读时(客户端对套接字执行write操作,或者执行close操作),或者有新的可应答(acceptable)套接字出现时(客户端对服务器的监听套接字执行connect操作),套接字产生AE_READABLE事件。*当套接字变得可写时(客户端对套接字执行read操作),套接字产生AE_WRITABLE事件。I/O多路复用程序允许服务器同时监听套接字的AE_READABLE事件和AE_WRITABLE事件,如果一个套接字同时产生了这两种事件,那么文件事件分派器会优先处理AE_READABLE事件,等到AE_READABLE事件处理完之后,才处理AE_WRITABLE事件。这也就是说,如果一个套接字又可读又可写的话,那么服务器将先读套接字,后写套接字。

文件事件的处理器

Redis为文件事件编写了多个处理器,这些事件处理器分别用于实现不同的网络通信需求,比如说:

*为了对连接服务器的各个客户端进行应答,服务器要为监听套接字关联连接应答处理器。
*为了接收客户端传来的命令请求,服务器要为客户端套接字关联命令请求处理器。
*为了向客户端返回命令的执行结果,服务器要为客户端套接字关联命令回复处理器。
*当主服务器和从服务器进行复制操作时,主从服务器都需要关联特别为复制功能编写的复制处理器。

在这些事件处理器里面,服务器最常用的要数与客户端进行通信的连接应答处理器、命令请求处理器和命令回复处理器。

文件事件处理器的构成

图12-1展示了文件事件处理器的四个组成部分,它们分别是套接字、I/O多路复用程序、文件事件分派器(dispatcher),以及事件处理器。
在这里插入图片描述
文件事件是对套接字操作的抽象,每当一个套接字准备好执行连接应答(accept)、写入、读取、关闭等操作时,就会产生一个文件事件。因为一个服务器通常会连接多个套接字,所以多个文件事件有可能会并发地出现。

文件事件是对套接字操作的抽象,每当一个套接字准备好执行连接应答(accept)、写入、读取、关闭等操作时,就会产生一个文件事件。

因为一个服务器通常会连接多个套接字,所以多个文件事件有可能会并发地出现。I/O多路复用程序负责监听多个套接字,并向文件事件分派器传送那些产生了事件的套接字。尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生事件的套接字都放到一个队列里面,然后通过这个队列,以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送套接字。

当上一个套接字产生的事件被处理完毕之后(该套接字为事件所关联的事件处理器执行完毕),I/O多路复用程序才会继续向文件事件分派器传送下一个套接字,如图12-2所示
在这里插入图片描述
文件事件分派器接收I/O多路复用程序传来的套接字,并根据套接字产生的事件的类型,调用相应的事件处理器。服务器会为执行不同任务的套接字关联不同的事件处理器,这些处理器是一个个函数,它们定义了某个事件发生时,服务器应该执行的动作。

1.连接应答处理器 acceptTcpHandler

networking.c/acceptTcpHandler函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为sys/socket.h/accept函数的包装。当Redis服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来,当有客户端用sys/socket.h/connect函数连接服务器监听套接字的时候,套接字就会产生AE_READABLE事件,引发连接应答处理器执行,并执行相应的套接字应答操作,如图12-4所示。
在这里插入图片描述

通过anetTcpAccept函数获得事件,然后对事件类型进行处理(acceptCommonHandler)。
在这里插入图片描述

anetTcpAccept

在这里插入图片描述

acceptCommonHandler

过程源码
首先调用 createClient 创建 client,接着判断当前 client 的数量是否超出了配置的 maxclients,如果超过,则给客户端发送错误信息,并且释放 client。
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
    
    client *c;
    if ((c = createClient(fd)) == NULL) {
    
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the Kernel I/O */
    if (listLength(server.clients) > server.maxclients) {
    
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
    
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }

    /* If the server is running in protected mode (the default) and there * is no password set, nor a specific interface is bound, we don't accept * requests from non loopback interfaces. Instead we try to explain the * user what to do to fix it if needed. */
    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
    
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
    
            char *err =
                "-DENIED Redis is running in protected mode because protected "
                "mode is enabled, no bind address was specified, no "
                "authentication password is requested to clients. In this mode "
                "connections are only accepted from the loopback interface. "
                "If you want to connect from external computers to Redis you "
                "may adopt one of the following solutions: "
                "1) Just disable protected mode sending the command "
                "'CONFIG SET protected-mode no' from the loopback interface "
                "by connecting to Redis from the same host the server is "
                "running, however MAKE SURE Redis is not publicly accessible "
                "from internet if you do so. Use CONFIG REWRITE to make this "
                "change permanent. "
                "2) Alternatively you can just disable the protected mode by "
                "editing the Redis configuration file, and setting the protected "
                "mode option to 'no', and then restarting the server. "
                "3) If you started the server manually just for testing, restart "
                "it with the '--protected-mode no' option. "
                "4) Setup a bind address or an authentication password. "
                "NOTE: You only need to do one of the above things in order for "
                "the server to start accepting connections from the outside.\r\n";
            if (write(c->fd,err,strlen(err)) == -1) {
    
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    }

    server.stat_numconnections++;
    c->flags |= flags;
}
createClient 针对每个链接创建一个客户端

createClient 方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储客户端通过 socket 发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client一共有三种类型,不同类型的对应缓冲区的大小都不同

普通客户端是除了复制和订阅的客户端之外的所有连接
从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制
订阅客户端用于发布订阅功能

createClient 方法除了创建 client 结构体并设置其属性值外,还会对 socket进行配置并注册读事件处理器

设置 socket 为 非阻塞 socket、设置 NO_DELAY 和 SO_KEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制。

设置读事件处理器,当客户端通过 socket 发送来数据后,Redis 会调用 readQueryFromClient 方法。

client *createClient(int fd) {
    
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
    
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
            //可读事件,
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
    
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0;
    c->qb_pos = 0;
    c->querybuf = sdsempty();
    c->pending_querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REPL_STATE_NONE;
    c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->read_reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->slave_ip[0] = '\0';
    c->slave_capa = SLAVE_CAPA_NONE;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->btype = BLOCKED_NONE;
    c->bpop.timeout = 0;
    c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
    c->bpop.target = NULL;
    c->bpop.xread_group = NULL;
    c->bpop.xread_consumer = NULL;
    c->bpop.xread_group_noack = 0;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    c->watched_keys = listCreate();
    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    c->client_list_node = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (fd != -1) linkClient(c);
    initClientMultiState(c);
    return c;
}

readQueryFromClient

在这里插入图片描述

过程源码
readQueryFromClient 方法会调用 read 方法从 socket 中读取数据到输入缓冲区中nread = read(fd, c->querybuf+qblen, readlen);
然后判断其大小是否大于系统设置的 client_max_querybuf_len,如果大于,则向 Redis返回错误信息,并关闭 client。if (sdslen(c->querybuf) > server.client_max_querybuf_len)
处理输入缓冲区processInputBufferAndReplicate
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
    
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
    
        if (errno == EAGAIN) {
    
            return;
        } else {
    
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
    
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
    
        /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
    
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to * compute the difference between the applied offset before and after * processing the buffer, to understand how much of the replication stream * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */
    processInputBufferAndReplicate(c);
}

processInputBufferAndReplicate

在这里插入图片描述

processInputBuffer 解析获取命令

processInputBuffer 主要是将输入缓冲区中的数据解析成对应的命令,根据命令类型是 PROTO_REQ_MULTIBULK 还是 PROTO_REQ_INLINE,来分别调用 processInlineBuffer 和 processMultibulkBuffer 方法来解析命令。

然后调用 processCommand 方法来执行命令。执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性,然后重置 client,让client可以接收一条命令。

当缓冲区中还有数据时就一直处理while(c->qb_pos < sdslen(c->querybuf))
从缓冲区解析命令在这里插入图片描述
执行命令在这里插入图片描述
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
    
    server.current_client = c;

    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
    
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
    
            if (c->querybuf[c->qb_pos] == '*') {
    
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
    
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
    
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
    
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
    
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
    
            resetClient(c);
        } else {
    
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == C_OK) {
    
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
    
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }

                /* Don't reset the client structure for clients blocked in a * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */
            if (server.current_client == NULL) break;
        }
    }

    /* Trim to pos */
    if (c->qb_pos) {
    
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }

    server.current_client = NULL;
}
处理命令 processCommand
过程图解
处理quiet命令在这里插入图片描述
根据命令字典查找在这里插入图片描述
处理内存达到最大时的情况在这里插入图片描述执行命令
call 方法是 Redis 中执行命令的通用方法,它会处理通用的执行命令的前置和后续操作
/* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
    
    /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
    
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
    
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
    
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }

    /* Check if the user is authenticated */
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
    
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return C_OK;
    }

    /* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
    
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
    
            if (c->cmd->proc == execCommand) {
    
                discardTransaction(c);
            } else {
    
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout * condition, to avoid mixing the propagation of scripts with the propagation * of DELs due to eviction. */
    if (server.maxmemory && !server.lua_timedout) {
    
        int out_of_memory = freeMemoryIfNeeded() == C_ERR;
        /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */
        if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
    
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return C_OK;
        }
    }

    /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */
    int deny_write_type = writeCommandsDeniedByDiskError();
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
    
        flagTransaction(c);
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return C_OK;
    }

    /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
    
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return C_OK;
    }

    /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
    
        addReply(c, shared.roslaveerr);
        return C_OK;
    }

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
    
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

    /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, * when slave-serve-stale-data is no and we are a slave with a broken * link with master. */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE))
    {
    
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return C_OK;
    }

    /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
    
        addReply(c, shared.loadingerr);
        return C_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands. */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
    
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return C_OK;
    }

    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
    
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

call方法执行命令
处理命令,调用命令处理函数(可以看到对于持久化需要的信息进行了更新)在这里插入图片描述
CMD_CALL_SLOWLOG 表示要记录慢查询日志在这里插入图片描述

/* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE No flags. * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set * in the call flags, then the command is propagated even if the * dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP * are set, the propagation into AOF or to slaves is not performed even * if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */
void call(client *c, int flags) {
    
    long long dirty, start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;

    /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
    
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);

    /* Call the command. */
    dirty = server.dirty;
    start = ustime();
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;

    /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */
    if (server.loading && c->flags & CLIENT_LUA)
        flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

    /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */
    if (c->flags & CLIENT_LUA && server.lua_caller) {
    
        if (c->flags & CLIENT_FORCE_REPL)
            server.lua_caller->flags |= CLIENT_FORCE_REPL;
        if (c->flags & CLIENT_FORCE_AOF)
            server.lua_caller->flags |= CLIENT_FORCE_AOF;
    }

    /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
    
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    if (flags & CMD_CALL_STATS) {
    
        /* use the real command that was executed (cmd and lastamc) may be * different, in case of MULTI-EXEC or re-written commands such as * EXPIRE, GEOADD, etc. */
        real_cmd->microseconds += duration;
        real_cmd->calls++;
    }

    /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
    
        int propagate_flags = PROPAGATE_NONE;

        /* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        /* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        /* However prevent AOF / replication propagation if the command * implementations called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }

    /* Restore the old replication flags, since call() can be executed * recursively. */
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    c->flags |= client_old_flags &
        (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

    /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */
    if (server.also_propagate.numops) {
    
        int j;
        redisOp *rop;

        if (flags & CMD_CALL_PROPAGATE) {
    
            for (j = 0; j < server.also_propagate.numops; j++) {
    
                rop = &server.also_propagate.ops[j];
                int target = rop->target;
                /* Whatever the command wish is, we honor the call() flags. */
                if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
                if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
                if (target)
                    propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
            }
        }
        redisOpArrayFree(&server.also_propagate);
    }
    server.also_propagate = prev_also_propagate;
    server.stat_numcommands++;
}
propagate() 将修改数据的指令添加到server.aof_buf中

feedAppendOnlyFile函数首先会判断Server是否开启了AOF,如果开启AOF,那么根据Redis通讯协议将修改数据的指令重现成请求的字符串,注意在超时设置的处理方式,接着将字符串append到server.aof_buf中即可。
在这里插入图片描述

feedAppendOnlyFile

https://www.superweb999.com/article/411515.html

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    
    sds buf = sdsempty();
    robj *tmpargv[3];
	
    /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */
     *  // 当前 db 不是指定的 aof db,通过创建 SELECT 命令来切换数据库
    if (dictid != server.aof_selected_db) {
    
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }
	 // 将 EXPIRE / PEXPIRE / EXPIREAT 命令翻译为 PEXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
    
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
    
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
        // 将 SETEX / PSETEX 命令翻译为 SET 和 PEXPIREAT 组合命令
    } else if (cmd->proc == setCommand && argc > 3) {
    
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
    
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {
    
        /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */
     * 如果server.aof_child_pid != -1那么表明此时Server正在重写rewrite AOF文件,需要将被修改的数据追加到server.aof_rewrite_buf_blocks链表中,等待rewrite结束后,追加到AOF文件中。
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

2.命令请求处理器

networking.c/readQueryFromClient函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容,具体实现为unistd.h/read函数的包装。当一个客户端通过连接应答处理器成功连接到服务器之后,服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来,当客户端向服务器发送命令请求的时候,套接字就会产生AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作,如图12-5所示。

在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。

3.命令回复处理器

networking.c/sendReplyToClient函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write函数的包装。当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来,当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,并执行相应的套接字写入操作,如图12-6所示。

时间事件

*定时事件:让一段程序在指定的时间之后执行一次。比如说,让程序X在当前时间的30毫秒之后执行一次。*周期性事件:让一段程序每隔指定时间就执行一次。比如说,让程序Y每隔30毫秒就执行一次。一个时间事件主要由以下三个属性组成:*id:服务器为时间事件创建的全局唯一ID(标识号)。ID号按从小到大的顺序递增,新事件的ID号比旧事件的ID号要大。*when:毫秒精度的UNIX时间戳,记录了时间事件的到达(arrive)时间。*timeProc:时间事件处理器,一个函数。当时间事件到达时,服务器就会调用相应的处理器来处理事件。一个时间事件是定时事件还是周期性事件取决于时间事件处理器的返回值:*如果事件处理器返回ae.h/AE_NOMORE,那么这个事件为定时事件:该事件在达到一次之后就会被删除,之后不再到达。

时间事件定义

在这里插入图片描述

双端链表

服务器将所有时间事件都放在一个无序链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已到达的时间事件,并调用相应的事件处理器。

图12-8展示了一个保存时间事件的链表的例子,链表中包含了三个不同的时间事件:因为新的时间事件总是插入到链表的表头,所以三个时间事件分别按ID逆序排序,表头事件的ID为3,中间事件的ID为2,表尾事件的ID为1
在这里插入图片描述
注意,我们说保存时间事件的链表为无序链表,指的不是链表不按ID排序,而是说,该链表不按when属性的大小排序。正因为链表没有按when属性进行排序,所以当时间事件执行器运行的时候,它必须遍历链表中的所有时间事件,这样才能确保服务器中所有已到达的时间事件都会被处理

无序链表并不影响时间事件处理器的性能
在目前版本中,正常模式下的Redis服务器只使用serverCron一个时间事件,而在benchmark模式下,服务器也只使用两个时间事件。在这种情况下,服务器几乎是将无序链表退化成一个指针来使用,所以使用无序链表来保存时间事件,并不影响事件执行的性能。

创建时间事件 aeCreateTimeEvent

创建时间事件并添加到时间事件链表。aeCreateTimeEvent

在这里插入图片描述
eventLoop:输入参数指向事件循环结构体;
milliseconds:表示此时间事件触发时间,单位毫秒,注意这是一个相对时间,即从当前时间算起,milliseconds毫秒后此时间事件会被触发;
proc:指向时间事件的处理函数;
clientData:指向对应的结构体对象;
finalizerProc:同样是函数指针,删除时间事件节点之前会调用此函数。

源码实例

从当前时间一毫秒后执行ServerCron函数。
在这里插入图片描述

原网站

版权声明
本文为[芒骁]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_44587855/article/details/125216253