当前位置:网站首页>Loggie source code analysis source file module backbone analysis
Loggie source code analysis source file module backbone analysis
2022-04-23 16:37:00 【Xuzhong -- Lei】
Project website :
There are a lot of code details , I'm also lazy 、 for instance job Generated jobUid,rename There are many details about the logo and so on , So don't comb one by one , Just sort out the general process , It is easier to understand the code by understanding the large framework
One 、file The overall architecture of the module
Some modules are not analyzed , such as mutliline modular

source adopt manager.go Create and use watcher、reader、ascTask、watcherTask、DbHandle MultiProcess And so on
source->start Function is a key function , Created and launched watcher and reader Wait for the task to be delivered to channel
func (s *Source) Start() {
log.Info("start source: %s", s.String())
if s.config.ReaderConfig.MultiConfig.Active {
s.multilineProcessor = GetOrCreateShareMultilineProcessor()
}
// register queue listener for ack
if s.ackEnable {
s.dbHandler = GetOrCreateShareDbHandler(s.config.DbConfig)
s.ackChainHandler = GetOrCreateShareAckChainHandler(s.sinkCount, s.config.AckConfig)
s.rc.RegisterListener(&AckListener{
sourceName: s.name,
ackChainHandler: s.ackChainHandler,
})
}
s.watcher = GetOrCreateShareWatcher(s.config.WatchConfig, s.config.DbConfig)
s.r = GetOrCreateReader(s.isolation, s.config.ReaderConfig, s.watcher)
s.HandleHttp()
}
watcher Create and listen
func newWatcher(config WatchConfig, dbHandler *dbHandler) *Watcher {
w := &Watcher{
done: make(chan struct{}),
config: config,
sourceWatchTasks: make(map[string]*WatchTask),
waiteForStopWatchTasks: make(map[string]*WatchTask),
watchTaskChan: make(chan *WatchTask),
dbHandler: dbHandler,
zombieJobChan: make(chan *Job, config.MaxOpenFds+1),
allJobs: make(map[string]*Job),
osWatchFiles: make(map[string]bool),
zombieJobs: make(map[string]*Job),
countDown: &sync.WaitGroup{},
stopOnce: &sync.Once{},
}
w.initOsWatcher()
go w.run()
return w
}
reader The creation of the cut listener ,start Start the process and wait Job Come in
func newReader(config ReaderConfig, watcher *Watcher) *Reader {
r := &Reader{
done: make(chan struct{}),
config: config,
jobChan: make(chan *Job, config.ReadChanSize),
watcher: watcher,
countDown: &sync.WaitGroup{},
stopOnce: &sync.Once{},
startOnce: &sync.Once{},
}
r.Start()
return r
}
The delivery task. So as to continue later watcher The job of
func (s *Source) ProductLoop(productFunc api.ProductFunc) {
log.Info("%s start product loop", s.String())
s.productFunc = productFunc
if s.config.ReaderConfig.MultiConfig.Active {
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
s.multilineProcessor.StartTask(s.mTask)
s.productFunc = s.multilineProcessor.Process
}
if s.config.AckConfig.Enable {
s.ackTask = NewAckTask(s.epoch, s.pipelineName, s.name, func(state *State) {
s.dbHandler.state <- state
})
s.ackChainHandler.StartTask(s.ackTask)
}
s.watchTask = NewWatchTask(s.epoch, s.pipelineName, s.name, s.config.CollectConfig, s.eventPool, s.productFunc, s.r.jobChan, s.config.Fields)
// start watch source paths
s.watcher.StartWatchTask(s.watchTask)
}
Two 、watcher The overall architecture of the module
watcher Manage multiple WatcherTasks, every last watcherTask Manage multiple Job.

First watcher What did you do ?
watcher Wait for , Waiting for messages and events arrival
func (w *Watcher) run() {
w.countDown.Add(1)
log.Info("watcher start")
scanFileTicker := time.NewTicker(w.config.ScanTimeInterval)
maintenanceTicker := time.NewTicker(w.config.MaintenanceInterval)
defer func() {
w.countDown.Done()
scanFileTicker.Stop()
maintenanceTicker.Stop()
log.Info("watcher stop")
}()
var osEvents chan fsnotify.Event
if w.config.EnableOsWatch && w.osWatcher != nil {
osEvents = w.osWatcher.Events
}
for {
select {
case <-w.done:
return
case watchTask := <-w.watchTaskChan:
w.handleWatchTaskEvent(watchTask)
case job := <-w.zombieJobChan:
w.decideZombieJob(job)
case e := <-osEvents:
//log.Info("os event: %v", e)
w.osNotify(e)
case <-scanFileTicker.C:
w.scan()
case <-maintenanceTicker.C:
w.maintenance()
}
}
}
w.done The flag has been processed , Do nothing
handleWatchTaskEvent Used to manage task Start and stop of the list , And persistent loading sqlite Database processing
decideZombieJob Used to determine whether the task is a zombie operation , If the homework is not active, it will be added to the zombie homework , Wait for observation
osEvent It's actually the bottom inotify Used to receive file change events
scan It's an important event , Regularly scan files and jobs , The performance is better , If you use inotify create Events may cause frequent wakeups , Occupy cpu Relatively high
func (w *Watcher) scan() {
// check any new files
w.scanNewFiles()
// active job
w.scanActiveJob()
// zombie job
w.scanZombieJob()
}
Job The job is via the message bus eventBus, Deliver to reader Of , Delivery action in
func (j *Job) Read() {
j.task.activeChan <- j
}
You can put the current job Deliver to reader
3、 ... and 、Reader Module brief analysis
reader Start reading the job file , The offset from the last read to the end of the current , And then through ProductEvent Send to downstream , Send it to the interceptor

Core code :
func (r *Reader) work(index int) {
r.countDown.Add(1)
log.Info("read worker-%d start", index)
defer func() {
log.Info("read worker-%d stop", index)
r.countDown.Done()
}()
readBufferSize := r.config.ReadBufferSize
maxContinueReadTimeout := r.config.MaxContinueReadTimeout
maxContinueRead := r.config.MaxContinueRead
inactiveTimeout := r.config.InactiveTimeout
backlogBuffer := make([]byte, 0, readBufferSize)
readBuffer := make([]byte, readBufferSize)
jobs := r.jobChan
for {
select {
case <-r.done:
return
case job := <-jobs:
filename := job.filename
status := job.status
if status == JobStop {
log.Info("job(uid: %s) file(%s) status(%d) is stop, job will be ignore", job.Uid(), filename, status)
r.watcher.decideJob(job)
continue
}
file := job.file
if file == nil {
log.Error("job(uid: %s) file(%s) released,job will be ignore", job.Uid(), filename)
r.watcher.decideJob(job)
continue
}
lastOffset, err := file.Seek(0, io.SeekCurrent)
if err != nil {
log.Error("can't get offset, file(name:%s) seek error, err: %v", filename, err)
r.watcher.decideJob(job)
continue
}
job.currentLines = 0
startReadTime := time.Now()
continueRead := 0
isEOF := false
wasSend := false
readTotal := int64(0)
processed := int64(0)
backlogBuffer = backlogBuffer[:0]
for {
readBuffer = readBuffer[:readBufferSize]
l, readErr := file.Read(readBuffer)
if errors.Is(readErr, io.EOF) || l == 0 {
isEOF = true
job.eofCount++
break
}
if readErr != nil {
log.Error("file(name:%s) read error, err: %v", filename, err)
break
}
read := int64(l)
readBuffer = readBuffer[:read]
now := time.Now()
processed = 0
for processed < read {
index := int64(bytes.IndexByte(readBuffer[processed:], '\n'))
if index == -1 {
break
}
index += processed
endOffset := lastOffset + readTotal + index
if len(backlogBuffer) != 0 {
backlogBuffer = append(backlogBuffer, readBuffer[processed:index]...)
job.ProductEvent(endOffset, now, backlogBuffer)
// Clean the backlog buffer after sending
backlogBuffer = backlogBuffer[:0]
} else {
job.ProductEvent(endOffset, now, readBuffer[processed:index])
}
processed = index + 1
}
readTotal += read
// The remaining bytes read are added to the backlog buffer
if processed < read {
backlogBuffer = append(backlogBuffer, readBuffer[processed:]...)
// TODO check whether it is too long to avoid bursting the memory
//if len(backlogBuffer)>max_bytes{
// log.Error
// break
//}
}
wasSend = processed != 0
if wasSend {
continueRead++
// According to the number of batches 2048, a maximum of one batch can be read,
// and a single event is calculated according to 512 bytes, that is, the maximum reading is 1mb ,maxContinueRead = 16 by default
// SSD recommends that maxContinueRead be increased by 3 ~ 5x
if continueRead > maxContinueRead {
break
}
if time.Since(startReadTime) > maxContinueReadTimeout {
break
}
}
}
if wasSend {
job.eofCount = 0
job.lastActiveTime = time.Now()
}
l := len(backlogBuffer)
if l > 0 {
// When it is necessary to back off the offset, check whether it is inactive to collect the last line
wasLastLineSend := false
if isEOF && !wasSend {
if time.Since(job.lastActiveTime) >= inactiveTimeout {
// Send "last line"
endOffset := lastOffset + readTotal
job.ProductEvent(endOffset, time.Now(), backlogBuffer)
job.lastActiveTime = time.Now()
wasLastLineSend = true
// Ignore the /n that may be written next.
// Because the "last line" of the collection thinks that either it will not be written later,
// or it will write /n first, and then write the content of the next line,
// it is necessary to seek a position later to ignore the /n that may be written
_, err = file.Seek(1, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
} else {
// Enable the job to escape and collect the last line
job.eofCount = 0
}
}
// Fallback accumulated buffer offset
if !wasLastLineSend {
backwardOffset := int64(-l)
_, err = file.Seek(backwardOffset, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
}
}
r.watcher.decideJob(job)
}
}
}
版权声明
本文为[Xuzhong -- Lei]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231634296874.html
边栏推荐
- 力扣-746.使用最小花费爬楼梯
- Detailed explanation of file operation (2)
- JSP learning 2
- Hypermotion cloud migration helped China Unicom. Qingyun completed the cloud project of a central enterprise and accelerated the cloud process of the group's core business system
- About background image gradient()!
- 英语 | Day15、16 x 句句真研每日一句(从句断开、修饰)
- linux上啟動oracle服務
- 04 Lua 运算符
- Easyexcel reads the geographical location data in the excel table and sorts them according to Chinese pinyin
- File upload and download of robot framework
猜你喜欢

Research and Practice on business system migration of a government cloud project

RecyclerView advanced use - to realize drag and drop function of imitation Alipay menu edit page

【Pygame小游戏】10年前风靡全球的手游《愤怒的小鸟》,是如何霸榜的?经典回归......

LVM and disk quota

Gartner 发布新兴技术研究:深入洞悉元宇宙

Real time operation of vim editor

安装Redis并部署Redis高可用集群

第九天 static 抽象类 接口

Gartner publie une étude sur les nouvelles technologies: un aperçu du métacosme

Install redis and deploy redis high availability cluster
随机推荐
Gartner 发布新兴技术研究:深入洞悉元宇宙
Easyexcel reads the geographical location data in the excel table and sorts them according to Chinese pinyin
NVIDIA graphics card driver error
[pyGame games] how did angry birds, a mobile game that became popular all over the world 10 years ago, dominate the list? Classic return
最详细的背包问题!!!
无线鹅颈麦主播麦手持麦无线麦克风方案应当如何选择
Install MySQL on MAC
Gartner 發布新興技術研究:深入洞悉元宇宙
299. 猜数字游戏
Day (4) of picking up matlab
Cloudy data flow? Disaster recovery on cloud? Last value content sharing years ago
On the security of key passing and digital signature
Cartoon: what are IAAs, PAAS, SaaS?
Execution plan calculation for different time types
Solution of garbled code on idea console
Hypermotion cloud migration completes Alibaba cloud proprietary cloud product ecological integration certification
G008-hwy-cc-estor-04 Huawei Dorado V6 storage simulator configuration
最詳細的背包問題!!!
Cloud migration practice in the financial industry Ping An financial cloud integrates hypermotion cloud migration solution to provide migration services for customers in the financial industry
Gartner predicts that the scale of cloud migration will increase significantly; What are the advantages of cloud migration?