566 字 | 2 分钟
网络线程池示意图
MainThd
主线程,模块初始化及创建相关线程,等待shutdown信号 调用堆栈 main
-- tendisplus::ServerEntry->startup
---- 初始化 rocksdb/ WorkerPool / NetworkAsio
---- WorkerPool::startup
------ new thread(WorkerPool::consumeTasks()/_ioCtx->run()) n
---- NetworkAsio::prepare
------ asio::ip::tcp::acceptor _acceptor(_acceptCtx, tcp::endpoint); //绑定端口
---- ReplManager::startup
---- NetworkAsio::run
------ new thread(_acceptThd)
------ new thread(asio::io_context::work(_rwCtx); _rwCtx->run()) n
------ NetworkAsio::doAccept
-------- _acceptor->async_accept(_rwCtx, std::move(cb))
-- setupSignals
-- waitForExit
_acceptThd
监听线程,绑定端口进行监听
NetworkAsio::doAccept->cb
-- ServerEntry::addSession(new NetSession);
---- NetSession::start
------ NetSession::stepState
-------- drainReqNet
---------- _sock.async_read_some(buffer, []{NetSession::drainReqCallback})
NetworkAsio
网络IO线程池 , 负责命令响应,并发到工作线程任务队列
NetSession::drainReqCallback (read回调)
-- processInlineBuffer
---- NetSession::setState(State::Process);
---- NetSession::schedule()
------ ServerEntry::schedule
-------- WorkPool::schedule([]{NetSession::stepState})
---------- asio::post(*_ioCtx, std::move([]{NetSession::stepState}))
命令应答,向客户端返回消息。(read、write解耦,不用互相等待)
NetSession::drainRspCallback (write回调)
-- drainRsp
---- asio::async_write(_sock, buf, []{NetSession::drainRspCallback}
网络模型如下图所示:
ServerEntry::_executor
工作线程池,处理命令解析并执行
NetSession::stepState
-- NetSession::processReq
---- ServerEntry::processRequest
------ Command::precheck
------ Command::runSessionCmd
------ NetSession::setResponse
-------- NetSession::_isSendRunning = true or _sendBuffer.push_back
-------- NetSession::drainRsp
---------- asio::async_write(_sock, buffer, size, [] {drainRspCallback});
---- NetSession::setState(State::DrainReqNet) or NetSession::setState(State::DrainReqBuf)
---- NetSession::schedule()
复制相关线程池
1.ReplManager::_fullReceiver slave向master发起全量复制命令fullsync,并同步处理master的backup
ReplManager::slaveSyncRoutine
--ReplManager::slaveStartFullsync (StoreMeta::replState == ReplState::REPL_CONNECT)
---- store->stop
---- client = ReplManager::createClient()
---- SET STATE TO REPL_TRANFER
---- getBackupInfo
------ client->writeLine("fullsync ...")
---- get all files
---- client->writeLine("+OK")
---- store->restart
---- SET STATE TO REPL_CONNECTED
2.ReplManager::_incrChecker 线程数:2 slave向master发起增量复制命令incrsync,并关联的session信息
ReplManager::slaveSyncRoutine
-- ReplManager::slaveChkSyncStatus (StoreMeta::replState == ReplState::REPL_CONNECTED)
---- if _syncStatus[storeId]->sessionid is valid return; # noop
---- client = ReplManager::createClient()
---- client->writeLine("INCRSYNC", ...)
---- client->readLine
---- client->writeLine("+PONG") # Master should sending binlog after read the "+PONG"
---- ReplManager::_syncStatus[storeId]->sessionId = sessionId
线程池_fullReciver, _incrChecker只有在slave初次建立,或slave重启后才真正工作,大部分时间都是空跑。
3.ReplManager::_fullPusher
ReplManager::supplyFullSyncRoutine
4.ReplManager::_incrPusher 5.ReplManager::_logRecycler
cluster相关线程池
- MigrateManager:: _migrateReceiver 搬迁接收方处理任务的线程池,默认4个
receiveSchedule
-- MigrateReceiveState::RECEIVE_SNAPSHOT
-- fullRecieve
-- receiveSnapShot
-- client->writeLine("readymigrate ", taskid)
-- client->readLine
- MigrateManage:: _migrateSender 搬迁发送方处理任务的线程池,默认4个
senderSchedule
--- MigrateSendState::START ,
---_migrateSender->schedule()
--- sendSlots
--- sendSnaphsot
--- sendBinlog
--- client->writeLine("migrateend", taskid)
--- client->readLine
--- setslot
- GCManager:_gcDeleter 清理脏数据的线程池,默认1个
--- DeleteRangeState::START
--- task->garbageDelete
--- deleteSlotRange