当前位置:网站首页>Loggie source code analysis source file module backbone analysis

Loggie source code analysis source file module backbone analysis

2022-04-23 16:37:20 Xuzhong -- Lei

Project website :

https://loggie-io.github.io/

There are a lot of code details , I'm also lazy 、 for instance job Generated jobUid,rename There are many details about the logo and so on , So don't comb one by one , Just sort out the general process , It is easier to understand the code by understanding the large framework

One 、file The overall architecture of the module

Some modules are not analyzed , such as mutliline modular

source adopt manager.go Create and use watcher、reader、ascTask、watcherTask、DbHandle MultiProcess And so on

source->start Function is a key function , Created and launched watcher and reader Wait for the task to be delivered to channel

func (s *Source) Start() {
	log.Info("start source: %s", s.String())
	if s.config.ReaderConfig.MultiConfig.Active {
		s.multilineProcessor = GetOrCreateShareMultilineProcessor()
	}
	// register queue listener for ack
	if s.ackEnable {
		s.dbHandler = GetOrCreateShareDbHandler(s.config.DbConfig)
		s.ackChainHandler = GetOrCreateShareAckChainHandler(s.sinkCount, s.config.AckConfig)
		s.rc.RegisterListener(&AckListener{
			sourceName:      s.name,
			ackChainHandler: s.ackChainHandler,
		})
	}

	s.watcher = GetOrCreateShareWatcher(s.config.WatchConfig, s.config.DbConfig)
	s.r = GetOrCreateReader(s.isolation, s.config.ReaderConfig, s.watcher)

	s.HandleHttp()
}

watcher Create and listen

func newWatcher(config WatchConfig, dbHandler *dbHandler) *Watcher {
	w := &Watcher{
		done:                   make(chan struct{}),
		config:                 config,
		sourceWatchTasks:       make(map[string]*WatchTask),
		waiteForStopWatchTasks: make(map[string]*WatchTask),
		watchTaskChan:          make(chan *WatchTask),
		dbHandler:              dbHandler,
		zombieJobChan:          make(chan *Job, config.MaxOpenFds+1),
		allJobs:                make(map[string]*Job),
		osWatchFiles:           make(map[string]bool),
		zombieJobs:             make(map[string]*Job),
		countDown:              &sync.WaitGroup{},
		stopOnce:               &sync.Once{},
	}
	w.initOsWatcher()
	go w.run()
	return w
}

reader The creation of the cut listener ,start Start the process and wait Job Come in

func newReader(config ReaderConfig, watcher *Watcher) *Reader {
	r := &Reader{
		done:      make(chan struct{}),
		config:    config,
		jobChan:   make(chan *Job, config.ReadChanSize),
		watcher:   watcher,
		countDown: &sync.WaitGroup{},
		stopOnce:  &sync.Once{},
		startOnce: &sync.Once{},
	}
	r.Start()
	return r
}

The delivery task. So as to continue later watcher The job of

func (s *Source) ProductLoop(productFunc api.ProductFunc) {
	log.Info("%s start product loop", s.String())
	s.productFunc = productFunc
	if s.config.ReaderConfig.MultiConfig.Active {
		s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
		s.multilineProcessor.StartTask(s.mTask)
		s.productFunc = s.multilineProcessor.Process
	}
	if s.config.AckConfig.Enable {
		s.ackTask = NewAckTask(s.epoch, s.pipelineName, s.name, func(state *State) {
			s.dbHandler.state <- state
		})
		s.ackChainHandler.StartTask(s.ackTask)
	}
	s.watchTask = NewWatchTask(s.epoch, s.pipelineName, s.name, s.config.CollectConfig, s.eventPool, s.productFunc, s.r.jobChan, s.config.Fields)
	// start watch source paths
	s.watcher.StartWatchTask(s.watchTask)
}

  Two 、watcher The overall architecture of the module

watcher Manage multiple WatcherTasks, every last watcherTask Manage multiple Job.

First watcher What did you do ?

watcher Wait for , Waiting for messages and events arrival


func (w *Watcher) run() {
	w.countDown.Add(1)
	log.Info("watcher start")
	scanFileTicker := time.NewTicker(w.config.ScanTimeInterval)
	maintenanceTicker := time.NewTicker(w.config.MaintenanceInterval)
	defer func() {
		w.countDown.Done()
		scanFileTicker.Stop()
		maintenanceTicker.Stop()
		log.Info("watcher stop")
	}()
	var osEvents chan fsnotify.Event
	if w.config.EnableOsWatch && w.osWatcher != nil {
		osEvents = w.osWatcher.Events
	}
	for {
		select {
		case <-w.done:
			return
		case watchTask := <-w.watchTaskChan:
			w.handleWatchTaskEvent(watchTask)
		case job := <-w.zombieJobChan:
			w.decideZombieJob(job)
		case e := <-osEvents:
			//log.Info("os event: %v", e)
			w.osNotify(e)
		case <-scanFileTicker.C:
			w.scan()
		case <-maintenanceTicker.C:
			w.maintenance()
		}
	}
}

 w.done The flag has been processed , Do nothing

handleWatchTaskEvent Used to manage task Start and stop of the list , And persistent loading sqlite Database processing

decideZombieJob Used to determine whether the task is a zombie operation , If the homework is not active, it will be added to the zombie homework , Wait for observation

osEvent It's actually the bottom inotify Used to receive file change events

scan It's an important event , Regularly scan files and jobs , The performance is better , If you use inotify create Events may cause frequent wakeups , Occupy cpu Relatively high

func (w *Watcher) scan() {
	// check any new files
	w.scanNewFiles()
	// active job
	w.scanActiveJob()
	// zombie job
	w.scanZombieJob()
}

 Job The job is via the message bus eventBus, Deliver to reader Of , Delivery action in

func (j *Job) Read() {
	j.task.activeChan <- j
}

  You can put the current job Deliver to reader

3、 ... and 、Reader Module brief analysis

reader Start reading the job file , The offset from the last read to the end of the current , And then through ProductEvent Send to downstream , Send it to the interceptor

  Core code :

func (r *Reader) work(index int) {
	r.countDown.Add(1)
	log.Info("read worker-%d start", index)
	defer func() {
		log.Info("read worker-%d stop", index)
		r.countDown.Done()
	}()
	readBufferSize := r.config.ReadBufferSize
	maxContinueReadTimeout := r.config.MaxContinueReadTimeout
	maxContinueRead := r.config.MaxContinueRead
	inactiveTimeout := r.config.InactiveTimeout
	backlogBuffer := make([]byte, 0, readBufferSize)
	readBuffer := make([]byte, readBufferSize)
	jobs := r.jobChan
	for {
		select {
		case <-r.done:
			return
		case job := <-jobs:
			filename := job.filename
			status := job.status
			if status == JobStop {
				log.Info("job(uid: %s) file(%s) status(%d) is stop, job will be ignore", job.Uid(), filename, status)
				r.watcher.decideJob(job)
				continue
			}
			file := job.file
			if file == nil {
				log.Error("job(uid: %s) file(%s) released,job will be ignore", job.Uid(), filename)
				r.watcher.decideJob(job)
				continue
			}
			lastOffset, err := file.Seek(0, io.SeekCurrent)
			if err != nil {
				log.Error("can't get offset, file(name:%s) seek error, err: %v", filename, err)
				r.watcher.decideJob(job)
				continue
			}
			job.currentLines = 0

			startReadTime := time.Now()
			continueRead := 0
			isEOF := false
			wasSend := false
			readTotal := int64(0)
			processed := int64(0)
			backlogBuffer = backlogBuffer[:0]
			for {
				readBuffer = readBuffer[:readBufferSize]
				l, readErr := file.Read(readBuffer)
				if errors.Is(readErr, io.EOF) || l == 0 {
					isEOF = true
					job.eofCount++
					break
				}
				if readErr != nil {
					log.Error("file(name:%s) read error, err: %v", filename, err)
					break
				}
				read := int64(l)
				readBuffer = readBuffer[:read]
				now := time.Now()
				processed = 0
				for processed < read {
					index := int64(bytes.IndexByte(readBuffer[processed:], '\n'))
					if index == -1 {
						break
					}
					index += processed

					endOffset := lastOffset + readTotal + index
					if len(backlogBuffer) != 0 {
						backlogBuffer = append(backlogBuffer, readBuffer[processed:index]...)
						job.ProductEvent(endOffset, now, backlogBuffer)

						// Clean the backlog buffer after sending
						backlogBuffer = backlogBuffer[:0]
					} else {
						job.ProductEvent(endOffset, now, readBuffer[processed:index])
					}
					processed = index + 1
				}

				readTotal += read

				// The remaining bytes read are added to the backlog buffer
				if processed < read {
					backlogBuffer = append(backlogBuffer, readBuffer[processed:]...)

					// TODO check whether it is too long to avoid bursting the memory
					//if len(backlogBuffer)>max_bytes{
					//	log.Error
					//	break
					//}
				}

				wasSend = processed != 0
				if wasSend {
					continueRead++
					// According to the number of batches 2048, a maximum of one batch can be read,
					// and a single event is calculated according to 512 bytes, that is, the maximum reading is 1mb ,maxContinueRead = 16 by default
					// SSD recommends that maxContinueRead be increased by 3 ~ 5x
					if continueRead > maxContinueRead {
						break
					}
					if time.Since(startReadTime) > maxContinueReadTimeout {
						break
					}
				}
			}

			if wasSend {
				job.eofCount = 0
				job.lastActiveTime = time.Now()
			}

			l := len(backlogBuffer)
			if l > 0 {
				// When it is necessary to back off the offset, check whether it is inactive to collect the last line
				wasLastLineSend := false
				if isEOF && !wasSend {
					if time.Since(job.lastActiveTime) >= inactiveTimeout {
						// Send "last line"
						endOffset := lastOffset + readTotal
						job.ProductEvent(endOffset, time.Now(), backlogBuffer)
						job.lastActiveTime = time.Now()
						wasLastLineSend = true
						// Ignore the /n that may be written next.
						// Because the "last line" of the collection thinks that either it will not be written later,
						// or it will write /n first, and then write the content of the next line,
						// it is necessary to seek a position later to ignore the /n that may be written
						_, err = file.Seek(1, io.SeekCurrent)
						if err != nil {
							log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
						}
					} else {
						// Enable the job to escape and collect the last line
						job.eofCount = 0
					}
				}
				// Fallback accumulated buffer offset
				if !wasLastLineSend {
					backwardOffset := int64(-l)
					_, err = file.Seek(backwardOffset, io.SeekCurrent)
					if err != nil {
						log.Error("can't set offset, file(name:%s) seek error: %v", filename, err)
					}
				}
			}
			r.watcher.decideJob(job)
		}
	}
}

版权声明
本文为[Xuzhong -- Lei]所创,转载请带上原文链接,感谢
/html/wnuIeV.html

随机推荐