当前位置:网站首页>Redis源码分析之PSYNC同步
Redis源码分析之PSYNC同步
2022-04-23 14:11:00 【Mrpre】
Redis master-slave 同步源码分析
(1)slave 流程分析
(2)master 流程分析
Slave 分析
当Redis 启动后,会每隔 1s 调用 replicationCron (通过 redis自带的serverCron后台线程),即无论是单机、还是Master、还是Slave都会调用这个函数。
我们先来讨论下作为Slave的情况下,replicationCron 函数运行逻辑。
作为slave,这个函数的功能应该能猜到,就是和Master保持连接、握手、接受Master存量数据+接受Master增量数据
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/*一堆超时判断,先不管*/
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}
......
/*作为slave,connectWithMaster创建和Master的socket,并且将回调函数设置为 syncWithMaster,并将同步状态设置为 REPL_STATE_CONNECTING */
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
/* Send ACK to master from time to time. * Note that we do not send periodic acks to masters that don't * support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
....
}
syncWithMaster 就是作为Slave和Master连接的的握手函数。
syncWithMaster函数就不再详细贴出代码,直接这里逻辑
1、发送ping
2、读pong表示成功继续第3步
3、如果master配置了requirepass ,那么slave必须配置 masterauth,所以就需要发送auth信息。如果不需要发送auth就到第5步
4、接受auth的结果,正确就继续
5、发送slave的port信息,这个port用于告知master连slave时使用哪个端口
6、判断 5 的结果
7、同5,只是发送的是slave 的 ip地址。5和7的端口,实际上目前翻看Redis源码,没有用到。
8、判断 7 的返回结果
9、slave发送自己的同步能力到master,用以和Master对齐同步的方法,最新版本支持 eof 和 psync2。psync是 partial resynchronization 的意思
10、发送 "PSYNC psync_replid psync_offset -1"到Master(全量的化就是 “PSYNC ? -1)
11、如果Master 回复 “+FULLRESYNC psync_replid psync_offset”,表示全量复制;如果回复”+CONTINUE psync_replid"表示部分复制
如果握手完成后,则 和 Master 建立的fd的 read event 变成 readSyncBulkPayload 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
......
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
}
readSyncBulkPayload
负责读完数据,然后调用 replicationCreateMasterClient
,将当前和master连接的fd的callback变成readQueryFromClient
这样,这个连接后续就会接受Master发过来的指令。
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
off_t left;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
/* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */
/*第一次读 repl_transfer_size 是 -1 * Master第一次传输的数据是rdb文件的长度, *格式是有2种,一种是 $len。另一种 是 $EOF:<40 bytes delimiter>,前者表示指定长度,然后slave读指定长度即可,后者是 40字节的结束符 *Master 会在rdb文件发送完成后发送40字节结束符表示自己结束了。 * */
if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
/* There are two possible forms for the bulk payload. One is the * usual $<count> bulk format. The other is used for diskless transfers * when the master does not know beforehand the size of the file to * transfer. In the latter case, the following format is used: * * $EOF:<40 bytes delimiter> * * At the end of the file the announced delimiter is transmitted. The * delimiter is long and random enough that the probability of a * collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path * at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
}
return;
}
/*到这就是读文件了 * usemark:表示通过EOF读 *repl_transfer_size:文件总大小 *repl_transfer_read:文件已读大小 */
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
/*读完一份,就加载rdb*/
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing * RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore * the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
}
return;
error:
cancelReplicationHandshake();
return;
}
总结一下,Redis 的 psync 模式
(1)slave 往 master connect,然后握手。
(2)slave 发送 sync/psync 往 Master。
(3)Master 判断 是否全量还是增量推送,然后回复给slave,全量模式下,master立刻发送 rdb文件,增量模式下,master就发送指令。
(4)如果是全量模式,在推送rdb文件的后,Master会把推送rdb文件这个窗口期内的数据+后续的数据,以指令方式推送,其实就是增量部分了。
(5)增量部分的发送,调用了call->propagate->replicationFeedSlaves->addReply 来发送至 slave, addReply 通常用于 处理完Client的请求然后发送响应,这里被用来同数据给Salve。但addReply只是写入Redis自己的缓冲区,然后等主循环的下一次循环,beforeSleep->handleClientsWithPendingWrites将数据发送出去。
Master 流程分析
看到Salve流程,基本就能猜出Master的流程。
1、Master处理 psync指令,判断是否是全量同步还是增量同步
2、masterTryPartialResynchronization->addReplyReplicationBacklog
实际上,Master开不开AOF和RDB并不影响 同步。
Master 的 backlog逻辑,对Master任意写操作,都会触发feedReplicationBacklog
,目的是将数据写入自己的缓冲区,然后记录一些偏移量。
缓冲区作用
(1)全量同步rdb时比较耗时,在RDB文件处理完成前,的这部分数据需要保留下来,作为增量数据传输
(2)Slave和Master全量同步之后增量同步,此时如果发生断网,那么Slave重连后,无条件全量同步肯定不能接受,Master需要保存部分数据,这部分数据如果恰巧包含了断网
期间Salve未收到的数据,则执行增量同步。
Redis 的backlog是一个环形的数据,所以不存在溢出,但是如果一个断网期间,Master写入的数据,实打实的超过一圈,那么也没救了,因为写入一圈,必然覆盖了部分旧数据,
相当于丢数据了所以看到不能增量同步了,需要全量。
几个offset说明一下意思
(1)repl_backlog_idx,下次待写入的数据的起始地址,所以下面代码可以看到,repl_backlog_idx 满一圈后,置为0,然后从0开始再写。
(2)repl_backlog_size,buffer的总长度,配置文件配置的,repl_backlog 就是根据这个大小申请出来的buffer
(3)master_repl_offset,从0开始累加的一个值
(4)repl_backlog_histlen 值 也是从0开始的一个值,最大为 repl_backlog_size
反正,大家不要尝试从字面角度或者源码角度理解这几个值,比较复杂,但是,核心目的很简单,这几个值就是需要让Master自己知道,Slave psync时发送过来的offset开始的指令,Master自己是否好保存。
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
printf("p %s\n",p);
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
版权声明
本文为[Mrpre]所创,转载请带上原文链接,感谢
https://wonderful.blog.csdn.net/article/details/107250252
边栏推荐
- 在Clion中给主函数传入外部参数
- After entering the new company, the operation and maintenance engineer can understand the deployment of the system from the following items
- JDBC details
- JumpServer
- ie8 浏览器提示是否 阻止访问js脚本
- js 递归(1)
- KVM学习资源
- Introduction to the use of semaphore for inter thread control
- DP - [noip2000] grid access
- mysql 5.1升级到5.66
猜你喜欢
Visio画拓扑图随记
TLS/SSL 协议详解 (30) SSL中的RSA、DHE、ECDHE、ECDH流程与区别
VMware installation 64 bit XP Chinese tutorial
Operation instructions of star boundary text automatic translator
Recyclerview advanced use (I) - simple implementation of sideslip deletion
XX project structure notes
Tongxin UOS uninstall php7 2.24, install php7 4.27 ; Uninstall and then install PHP 7.2.34
About the configuration and use of json5 in nodejs
金融行业云迁移实践 平安金融云整合HyperMotion云迁移解决方案,为金融行业客户提供迁移服务
After entering the new company, the operation and maintenance engineer can understand the deployment of the system from the following items
随机推荐
Notes on Visio drawing topology
xx项目架构随记
Processing MKDIR: unable to create directory 'AAA': read only file system
OpenSSH的升级、版本号的修改
rsync+inotify远程同步
进入新公司,运维工程师从下面这几项了解系统的部署
Recyclerview advanced use (I) - simple implementation of sideslip deletion
Algorithem_ReverseLinkedList
倒计时1天~2022云容灾产品线上发布会即将开始
js 抛物线运动方法封装
Use the executors class to quickly create a thread pool
openstack理论知识
MySQL数据库讲解(七)
翻牌效果
GFS分布式文件系统(理论)
Quickly understand the three ways of thread implementation
TLS/SSL 协议详解 (30) SSL中的RSA、DHE、ECDHE、ECDH流程与区别
百度笔试2022.4.12+编程题目:简单整数问题
在电视屏幕上进行debug调试
Use cases of the arrays class