A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Just another sentiment wrapper.

sentimany Just a simple sentiment tool. It just grabs a set of pre-made sentiment models that you can quickly use to attach sentiment scores to text.

vincent d warmerdam 15 Dec 27, 2022
Tools for analyzing Java JVM gc log files

gc_log This package consists of two separate utilities useful for : gc_log_visualizer.py regionsize.py GC Log Visualizer This was updated to run under

Brad Schoening 0 Jan 04, 2022
Perform oocyst segmentation in mercurochrome stained mosquito midgut

Midgut_oocyst_segmentation Perform oocyst segmentation in mercurochrome stained mosquito midguts This oocyst segmentation model also powers the webtoo

Duo Peng 3 Oct 27, 2021
Cairo-bloom - A naive bloom filter implementation in Cairo

🥀 cairo-bloom A naive bloom filter implementation in Cairo. A Bloom filter is a

Sam Barnes 37 Oct 01, 2022
Linux Security and Monitoring Scripts

Linux Security and Monitoring Scripts These are a collection of security and monitoring scripts you can use to monitor your Linux installation for sec

Andre Pawlowski 65 Aug 27, 2022
Research on how Gboard Stickers work.

Google-Sticker-Mashup-Research Research on how Gboard Stickers work. Contribute Contributing is nice, and you will be listed below for contributing. C

Jeremiah 45 Oct 28, 2022
One-stop-shop for docs and test coverage of dbt projects.

dbt-coverage One-stop-shop for docs and test coverage of dbt projects. Why do I need something like this? dbt-coverage is to dbt what coverage.py and

Slido 106 Dec 27, 2022
Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Henrik Ricarte 2 Mar 01, 2022
Batch Python Program Verify

Batch Python Program Verify About As a TA(teaching assistant) of Programming Class, it is very annoying to test students' homework assignments one by

Han-Wei Li 7 Dec 20, 2022
With the initiation of the COVID vaccination drive across India for all individuals above the age of 18, I wrote a python script which alerts the user regarding open slots in the vicinity!

cowin_notifier With the initiation of the COVID vaccination drive across India for all individuals above the age of 18, I wrote a python script which

13 Aug 01, 2021
A flexible free and unlimited python tool to translate between different languages in a simple way using multiple translators.

deep-translator Translation for humans A flexible FREE and UNLIMITED tool to translate between different languages in a simple way using multiple tran

Nidhal Baccouri 806 Jan 04, 2023
Check a discord message and give it a percentage of scamminess

scamChecker Check a discord message and give it a percentage of scamminess Run the bot, and run the command !scamCheck and it will return a percentage

3 Sep 22, 2022
Ergonomic option parser on top of dataclasses, inspired by structopt.

oppapī Ergonomic option parser on top of dataclasses, inspired by structopt. Usage from typing import Optional from oppapi import from_args, oppapi @

yukinarit 4 Jul 19, 2022
Petuhlang is a joke-like language, based on Python.

Petuhlang is a joke-like language, based on Python. It updates builtins to make a new syntax based on operators rewrite.

DenyS 9 Jun 19, 2022
Learn Python tips, tools, and techniques in around 5 minutes each.

Python shorts Learn Python tips, tools, and techniques in around 5 minutes each. Watch on YouTube Subscribe on YouTube to keep up with all the videos.

Michael Kennedy 28 Jan 01, 2023
🍬️🦇️ Open source Trick or Treat! 🦇️🍬️

Open Source Halloween! What's an easy way to have fun, and celebrate an open source Halloween? Open source trick or treating, of course! The repositor

Research Software Engineers 3 Oct 18, 2021
chiarose(XCR) based on chia(XCH) source code fork, open source public chain

chia-rosechain 一个无耻的小活动 | A shameless little event 如果您喜欢这个项目,请点击star 将赠送您520朵玫瑰,可以去 facebook 留下您的(xcr)地址,和github用户名。 If you like this project, please

ddou123 376 Dec 14, 2022
This is a modified variation of abhiTronix's vidgear. In this variation, it is possible to write the output file anywhere regardless the permissions.

Info In order to download this package: Windows 10: Press Windows+S, Type PowerShell (cmd in older versions) and hit enter, Type pip install vidgear_n

Ege Akman 3 Jan 30, 2022
Tools for downloading and processing numerical weather predictions

NWP Tools for downloading and processing numerical weather predictions At the moment, this code is focused on downloading historical UKV NWPs produced

Open Climate Fix 6 Nov 24, 2022
A Blender addon to align the origin to the top, center or bottom of a mesh object

Align Origin Blender Addon. Align Origin Blender Addon. What? This simple addon lets you align the origin to the top, center or bottom of a mesh objec

VA79 7 Nov 30, 2022