当前位置:网站首页>loggie 源码分析 source file 模块主干分析
loggie 源码分析 source file 模块主干分析
2022-04-23 16:34:00 【序冢--磊】
项目官网:
代码细节很多,自己也比较懒、比如说job生成的jobUid,rename的标识等等细节很多,所以不去挨个梳理,只梳理大体流程,了解大框架更容易读懂代码
一、file模块整体架构
有些模块就不分析了,比如mutliline模块
source 通过manager.go 创建和使用 watcher、reader、ascTask、watcherTask、DbHandle MultiProcess等子模块
source->start 函数是一个比较关键的函数,创建并启动了 watcher 和 reader等待任务投递到channel
func (s *Source) Start() {
log.Info("start source: %s", s.String())
if s.config.ReaderConfig.MultiConfig.Active {
s.multilineProcessor = GetOrCreateShareMultilineProcessor()
}
// register queue listener for ack
if s.ackEnable {
s.dbHandler = GetOrCreateShareDbHandler(s.config.DbConfig)
s.ackChainHandler = GetOrCreateShareAckChainHandler(s.sinkCount, s.config.AckConfig)
s.rc.RegisterListener(&AckListener{
sourceName: s.name,
ackChainHandler: s.ackChainHandler,
})
}
s.watcher = GetOrCreateShareWatcher(s.config.WatchConfig, s.config.DbConfig)
s.r = GetOrCreateReader(s.isolation, s.config.ReaderConfig, s.watcher)
s.HandleHttp()
}
watcher 的创建并且监听
func newWatcher(config WatchConfig, dbHandler *dbHandler) *Watcher {
w := &Watcher{
done: make(chan struct{}),
config: config,
sourceWatchTasks: make(map[string]*WatchTask),
waiteForStopWatchTasks: make(map[string]*WatchTask),
watchTaskChan: make(chan *WatchTask),
dbHandler: dbHandler,
zombieJobChan: make(chan *Job, config.MaxOpenFds+1),
allJobs: make(map[string]*Job),
osWatchFiles: make(map[string]bool),
zombieJobs: make(map[string]*Job),
countDown: &sync.WaitGroup{},
stopOnce: &sync.Once{},
}
w.initOsWatcher()
go w.run()
return w
}
reader 的创建切监听,start里开启协程等待Job进来
func newReader(config ReaderConfig, watcher *Watcher) *Reader {
r := &Reader{
done: make(chan struct{}),
config: config,
jobChan: make(chan *Job, config.ReadChanSize),
watcher: watcher,
countDown: &sync.WaitGroup{},
stopOnce: &sync.Once{},
startOnce: &sync.Once{},
}
r.Start()
return r
}
投递task.从而继续后面watcher的工作
func (s *Source) ProductLoop(productFunc api.ProductFunc) {
log.Info("%s start product loop", s.String())
s.productFunc = productFunc
if s.config.ReaderConfig.MultiConfig.Active {
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
s.multilineProcessor.StartTask(s.mTask)
s.productFunc = s.multilineProcessor.Process
}
if s.config.AckConfig.Enable {
s.ackTask = NewAckTask(s.epoch, s.pipelineName, s.name, func(state *State) {
s.dbHandler.state <- state
})
s.ackChainHandler.StartTask(s.ackTask)
}
s.watchTask = NewWatchTask(s.epoch, s.pipelineName, s.name, s.config.CollectConfig, s.eventPool, s.productFunc, s.r.jobChan, s.config.Fields)
// start watch source paths
s.watcher.StartWatchTask(s.watchTask)
}
二、watcher模块整体架构
watcher 管理多个WatcherTasks,每一个watcherTask 管理多个Job。
首先watcher做了什么?
watcher再等待,等待消息和事件的 到来
func (w *Watcher) run() {
w.countDown.Add(1)
log.Info("watcher start")
scanFileTicker := time.NewTicker(w.config.ScanTimeInterval)
maintenanceTicker := time.NewTicker(w.config.MaintenanceInterval)
defer func() {
w.countDown.Done()
scanFileTicker.Stop()
maintenanceTicker.Stop()
log.Info("watcher stop")
}()
var osEvents chan fsnotify.Event
if w.config.EnableOsWatch && w.osWatcher != nil {
osEvents = w.osWatcher.Events
}
for {
select {
case <-w.done:
return
case watchTask := <-w.watchTaskChan:
w.handleWatchTaskEvent(watchTask)
case job := <-w.zombieJobChan:
w.decideZombieJob(job)
case e := <-osEvents:
//log.Info("os event: %v", e)
w.osNotify(e)
case <-scanFileTicker.C:
w.scan()
case <-maintenanceTicker.C:
w.maintenance()
}
}
}
w.done 标志已经处理完了,不做任何处理
handleWatchTaskEvent 用来管理task列表的启动和停止,并且进行持久化加载sqlite数据库处理
decideZombieJob 用来决定任务是否是僵尸作业,如果作业不活跃会被加到僵尸作业里,等待观察
osEvent 其实就是底层的inotify 用来接收文件变动事件
scan是个比较重要的事件,定时扫描文件和作业,性能这样比较好,如果用inotify create 事件可能会造成频繁唤醒,占用cpu比较高
func (w *Watcher) scan() {
// check any new files
w.scanNewFiles()
// active job
w.scanActiveJob()
// zombie job
w.scanZombieJob()
}
Job 作业是通过消息总线eventBus,投递给reader的,投递动作在
func (j *Job) Read() {
j.task.activeChan <- j
}
可以把当前的job投递给reader
三、Reader模块简要分析
reader 开始读作业文件,从上次的偏移量一直读到当前的末尾,然后通过ProductEvent发送给下游,发送给拦截器
核心代码:
func (r *Reader) work(index int) {
r.countDown.Add(1)
log.Info("read worker-%d start", index)
defer func() {
log.Info("read worker-%d stop", index)
r.countDown.Done()
}()
readBufferSize := r.config.ReadBufferSize
maxContinueReadTimeout := r.config.MaxContinueReadTimeout
maxContinueRead := r.config.MaxContinueRead
inactiveTimeout := r.config.InactiveTimeout
backlogBuffer := make([]byte, 0, readBufferSize)
readBuffer := make([]byte, readBufferSize)
jobs := r.jobChan
for {
select {
case <-r.done:
return
case job := <-jobs:
filename := job.filename
status := job.status
if status == JobStop {
log.Info("job(uid: %s) file(%s) status(%d) is stop, job will be ignore", job.Uid(), filename, status)
r.watcher.decideJob(job)
continue
}
file := job.file
if file == nil {
log.Error("job(uid: %s) file(%s) released,job will be ignore", job.Uid(), filename)
r.watcher.decideJob(job)
continue
}
lastOffset, err := file.Seek(0, io.SeekCurrent)
if err != nil {
log.Error("can't get offset, file(name:%s) seek error, err: %v", filename, err)
r.watcher.decideJob(job)
continue
}
job.currentLines = 0
startReadTime := time.Now()
continueRead := 0
isEOF := false
wasSend := false
readTotal := int64(0)
processed := int64(0)
backlogBuffer = backlogBuffer[:0]
for {
readBuffer = readBuffer[:readBufferSize]
l, readErr := file.Read(readBuffer)
if errors.Is(readErr, io.EOF) || l == 0 {
isEOF = true
job.eofCount++
break
}
if readErr != nil {
log.Error("file(name:%s) read error, err: %v", filename, err)
break
}
read := int64(l)
readBuffer = readBuffer[:read]
now := time.Now()
processed = 0
for processed < read {
index := int64(bytes.IndexByte(readBuffer[processed:], '\n'))
if index == -1 {
break
}
index += processed
endOffset := lastOffset + readTotal + index
if len(backlogBuffer) != 0 {
backlogBuffer = append(backlogBuffer, readBuffer[processed:index]...)
job.ProductEvent(endOffset, now, backlogBuffer)
// Clean the backlog buffer after sending
backlogBuffer = backlogBuffer[:0]
} else {
job.ProductEvent(endOffset, now, readBuffer[processed:index])
}
processed = index + 1
}
readTotal += read
// The remaining bytes read are added to the backlog buffer
if processed < read {
backlogBuffer = append(backlogBuffer, readBuffer[processed:]...)
// TODO check whether it is too long to avoid bursting the memory
//if len(backlogBuffer)>max_bytes{
// log.Error
// break
//}
}
wasSend = processed != 0
if wasSend {
continueRead++
// According to the number of batches 2048, a maximum of one batch can be read,
// and a single event is calculated according to 512 bytes, that is, the maximum reading is 1mb ,maxContinueRead = 16 by default
// SSD recommends that maxContinueRead be increased by 3 ~ 5x
if continueRead > maxContinueRead {
break
}
if time.Since(startReadTime) > maxContinueReadTimeout {
break
}
}
}
if wasSend {
job.eofCount = 0
job.lastActiveTime = time.Now()
}
l := len(backlogBuffer)
if l > 0 {
// When it is necessary to back off the offset, check whether it is inactive to collect the last line
wasLastLineSend := false
if isEOF && !wasSend {
if time.Since(job.lastActiveTime) >= inactiveTimeout {
// Send "last line"
endOffset := lastOffset + readTotal
job.ProductEvent(endOffset, time.Now(), backlogBuffer)
job.lastActiveTime = time.Now()
wasLastLineSend = true
// Ignore the /n that may be written next.
// Because the "last line" of the collection thinks that either it will not be written later,
// or it will write /n first, and then write the content of the next line,
// it is necessary to seek a position later to ignore the /n that may be written
_, err = file.Seek(1, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
} else {
// Enable the job to escape and collect the last line
job.eofCount = 0
}
}
// Fallback accumulated buffer offset
if !wasLastLineSend {
backwardOffset := int64(-l)
_, err = file.Seek(backwardOffset, io.SeekCurrent)
if err != nil {
log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
}
}
}
r.watcher.decideJob(job)
}
}
}
版权声明
本文为[序冢--磊]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_32783703/article/details/124332577
边栏推荐
- 5分钟NLP:Text-To-Text Transfer Transformer (T5)统一的文本到文本任务模型
- About JMeter startup flash back
- Hyperbdr cloud disaster recovery v3 Release of version 3.0 | upgrade of disaster recovery function and optimization of resource group management function
- TIA botu - basic operation
- 下载并安装MongoDB
- Creation of RAID disk array and RAID5
- Passing header request header information between services through feign
- 第十天 异常机制
- 100 deep learning cases | day 41 - convolutional neural network (CNN): urbansound 8K audio classification (speech recognition)
- 阿里研发三面,面试官一套组合拳让我当场懵逼
猜你喜欢
VMware Workstation cannot connect to the virtual machine. The system cannot find the specified file
Set the color change of interlaced lines in cells in the sail software and the font becomes larger and red when the number is greater than 100
Cloud migration practice in the financial industry Ping An financial cloud integrates hypermotion cloud migration solution to provide migration services for customers in the financial industry
Hypermotion cloud migration completes Alibaba cloud proprietary cloud product ecological integration certification
Use if else to judge in sail software - use the title condition to judge
Hypermotion cloud migration helped China Unicom. Qingyun completed the cloud project of a central enterprise and accelerated the cloud process of the group's core business system
Take according to the actual situation, classify and summarize once every three levels, and see the figure to know the demand
Review 2021: how to help customers clear the obstacles in the last mile of going to the cloud?
Sort by character occurrence frequency 451
捡起MATLAB的第(4)天
随机推荐
力扣-198.打家劫舍
ESXi封装网卡驱动
Postman batch production body information (realize batch modification of data)
Oak-d raspberry pie cloud project [with detailed code]
Es common query, sorting and aggregation statements
What is the experience of using prophet, an open source research tool?
Research and Practice on business system migration of a government cloud project
Sail soft implements a radio button, which can uniformly set the selection status of other radio buttons
最詳細的背包問題!!!
Sail soft calls the method of dynamic parameter transfer and sets parameters in the title
欣旺达:HEV和BEV超快充拳头产品大规模出货
Day (7) of picking up matlab
Force buckle - 198 raid homes and plunder houses
捡起MATLAB的第(2)天
451. 根据字符出现频率排序
Phpstudy V8, a commonly used software for station construction 1 graphic installation tutorial (Windows version) super detailed
阿里研发三面,面试官一套组合拳让我当场懵逼
Nacos 详解,有点东西
100 deep learning cases | day 41 - convolutional neural network (CNN): urbansound 8K audio classification (speech recognition)
【Pygame小游戏】10年前风靡全球的手游《愤怒的小鸟》,是如何霸榜的?经典回归......