当前位置:网站首页>PSYNC synchronization of redis source code analysis
PSYNC synchronization of redis source code analysis
2022-04-23 15:04:00 【Mrpre】
Redis master-slave Synchronous source code analysis
(1)slave Process analysis
(2)master Process analysis
Slave analysis
When Redis After starting , Every time 1s call replicationCron ( adopt redis Self contained serverCron Background thread ), That is, whether it is a single machine 、 still Master、 still Slave Will call this function .
Let's talk about what to do first Slave Under the circumstances ,replicationCron Function running logic .
As slave, The function of this function should be able to guess , That's right. Master Keep connected 、 handshake 、 Accept Master Stock data + Accept Master Incremental data
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* A bunch of timeout judgments , I don't care */
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}
......
/* As slave,connectWithMaster Create and Master Of socket, And set the callback function to syncWithMaster, And set the synchronization status to REPL_STATE_CONNECTING */
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
/* Send ACK to master from time to time. * Note that we do not send periodic acks to masters that don't * support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
....
}
syncWithMaster It's doing Slave and Master Connected handshake function .
syncWithMaster The function will no longer post the code in detail , Directly here logic
1、 send out ping
2、 read pong Indicates success. Continue the second 3 Step
3、 If master Configured with requirepass , that slave You have to configure masterauth, So you need to send auth Information . If you don't need to send auth Just to the second 5 Step
4、 Accept auth Result , If it's right, continue
5、 send out slave Of port Information , This port Used to inform master even slave Which port to use
6、 Judge 5 Result
7、 Same as 5, Just sending slave Of ip Address .5 and 7 The port of , In fact, I'm looking at Redis Source code , Not used .
8、 Judge 7 Return result of
9、slave Send your own synchronization capability to master, For and Master Method of alignment synchronization , Latest version support eof and psync2.psync yes partial resynchronization It means
10、 send out "PSYNC psync_replid psync_offset -1" To Master( Full quantification is “PSYNC ? -1)
11、 If Master reply “+FULLRESYNC psync_replid psync_offset”, It means full replication ; If reply ”+CONTINUE psync_replid" Indicates partial replication
If after the handshake , be and Master The establishment of a fd Of read event become readSyncBulkPayload function
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
......
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
}
readSyncBulkPayload
Be responsible for reading the data , And then call replicationCreateMasterClient
, The current and master Connected fd Of callback become readQueryFromClient
such , This connection will be accepted later Master The instructions sent .
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
off_t left;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
/* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */
/* Read... For the first time repl_transfer_size yes -1 * Master The data transmitted for the first time is rdb The length of the file , * The format is 2 Kind of , One is $len. Another kind yes $EOF:<40 bytes delimiter>, The former indicates the specified length , then slave Read the specified length , The latter is 40 End of byte *Master Will be in rdb Send the file after sending 40 The byte terminator indicates that it has ended . * */
if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
/* There are two possible forms for the bulk payload. One is the * usual $<count> bulk format. The other is used for diskless transfers * when the master does not know beforehand the size of the file to * transfer. In the latter case, the following format is used: * * $EOF:<40 bytes delimiter> * * At the end of the file the announced delimiter is transmitted. The * delimiter is long and random enough that the probability of a * collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path * at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
}
return;
}
/* This is to read the file * usemark: Said by EOF read *repl_transfer_size: Total file size *repl_transfer_read: File read size */
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
/* Finish reading one , Just load rdb*/
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing * RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore * the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
}
return;
error:
cancelReplicationHandshake();
return;
}
To sum up ,Redis Of psync Pattern
(1)slave Go to master connect, And shake hands .
(2)slave send out sync/psync Go to Master.
(3)Master Judge Whether full or incremental push , And then reply to slave, In full mode ,master Send it now rdb file , In incremental mode ,master Just send instructions .
(4) If it's full mode , It's pushing rdb After the document ,Master Will push rdb File the data in this window + Follow up data , Push... By command , It's actually the incremental part .
(5) Sending of incremental part , Called call->propagate->replicationFeedSlaves->addReply To send to slave, addReply Usually used for processed Client Then send a response , This is used to give the same data to Salve. but addReply Just write Redis Own buffer , Then wait for the next cycle of the main cycle ,beforeSleep->handleClientsWithPendingWrites Send data out .
Master Process analysis
notice Salve technological process , You can basically guess Master The process of .
1、Master Handle psync Instructions , Judge whether it is full synchronization or incremental synchronization
2、masterTryPartialResynchronization->addReplyReplicationBacklog
actually ,Master be unable to open AOF and RDB Does not affect Sync .
Master Of backlog Logic , Yes Master Arbitrary write operation , Will trigger feedReplicationBacklog
, The purpose is to write data to its own buffer , Then record some offsets .
Buffer function
(1) Full amount of synchronization rdb It takes a lot of time , stay RDB Before the file processing is completed , This part of the data needs to be preserved , As incremental data transmission
(2)Slave and Master After full synchronization, incremental synchronization , At this time, if the network is disconnected , that Slave After reconnection , Unconditional full synchronization is definitely unacceptable ,Master Need to save some data , If this part of data happens to include network disconnection
period Salve Data not received , Then perform incremental synchronization .
Redis Of backlog Is a ring of data , So there is no overflow , But if a period of disconnection ,Master Written data , Actually more than one lap , Then it's hopeless , Because write a circle , It must cover part of the old data ,
It's equivalent to losing data, so you can't see incremental synchronization , Need a full amount .
How many? offset Explain the meaning
(1)repl_backlog_idx, The starting address of the data to be written next time , So the following code can see ,repl_backlog_idx After a full lap , Set as 0, And then from 0 Start writing .
(2)repl_backlog_size,buffer The total length of , Configuration file configuration ,repl_backlog It's based on this size buffer
(3)master_repl_offset, from 0 Start accumulating a value
(4)repl_backlog_histlen value Also from the 0 Start with a value , The maximum is repl_backlog_size
Anyway , Don't try to understand these values literally or from the perspective of source code , More complicated , however , The core purpose is simple , These values need to make Master I know ,Slave psync Sent from offset The order to start ,Master Whether you can save .
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
printf("p %s\n",p);
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
版权声明
本文为[Mrpre]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231409588023.html
边栏推荐
- JS -- realize click Copy function
- thinkphp5+数据大屏展示效果
- 三、梯度下降求解最小θ
- Using MATLAB programming to realize the steepest descent method to solve unconstrained optimization problems
- 我的 Raspberry Pi Zero 2W 折腾笔记,记录一些遇到的问题和解决办法
- capacitance
- How to use OCR in 5 minutes
- Raised exception class eaccexviolation with 'access violation at address 45efd5 in module error
- Chapter 7 of JVM series -- bytecode execution engine
- 牛客网数据库SQL实战详细剖析(26-30)
猜你喜欢
1n5408-asemi rectifier diode
On the day of entry, I cried (mushroom street was laid off and fought for seven months to win the offer)
Bingbing learning notes: take you step by step to realize the sequence table
UML学习_day2
[detailed explanation of factory mode] factory method mode
Leetcode165 compare version number double pointer string
What is the role of the full connection layer?
For 22 years, you didn't know the file contained vulnerabilities?
[jz46 translate numbers into strings]
OC to swift conditional compilation, marking, macro, log, version detection, expiration prompt
随机推荐
[NLP] HMM hidden Markov + Viterbi word segmentation
I/O复用的高级应用:同时处理 TCP 和 UDP 服务
When splicing HQL, the new field does not appear in the construction method
Epolloneshot event of epoll -- instance program
[detailed explanation of factory mode] factory method mode
帧同步 实现
如何打开Win10启动文件夹?
The difference between having and where in SQL
Practice of unified storage technology of oppo data Lake
Ffmpeg installation error: NASM / yasm not found or too old Use --disable-x86asm for a clipped build
8.2 text preprocessing
January 1, 1990 is Monday. Define the function date_ to_ Week (year, month, day), which realizes the function of returning the day of the week after inputting the year, month and day, such as date_ to
win10 任务栏通知区图标不见了
C语言超全学习路线(收藏让你少走弯路)
Advanced version of array simulation queue - ring queue (real queuing)
Thread synchronization, life cycle
Openfaas practice 4: template operation
LeetCode149-直线上最多的点数-数学-哈希表
Do (local scope), initializer, memory conflict, swift pointer, inout, unsafepointer, unsafebitcast, success
博睿数据携手F5共同构建金融科技从代码到用户的全数据链DNA