A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
msgqywx 使用企业微信的应用消息推送实时信息

msgqywx 使用企业微信的应用消息推送实时信息

Demon Finch 8 Dec 18, 2022
Better firefox bookmarks script for rofi

rofi-bookmarks Small python script to open firefox bookmarks with rofi. Features Icons! Only show bookmarks in a specified bookmark folder Show entire

32 Nov 10, 2022
A Tool to validate domestic New Zealand vaccine passes

Vaccine Validator Tool to validate domestic New Zealand vaccine passes Create a new virtual environment: python3 -m venv ./venv Activate virtual envi

8 May 01, 2022
The program converts Swiss notes into American notes

Informatik-Programmieren Einleitung: Das Programm rechnet Schweizer Noten in das Amerikanische Noten um. Der Benutzer kann seine Note eingeben und der

2 Dec 16, 2021
[CVPR 2020] Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective

Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective [Arxiv] This is PyTorch implementation of th

Abdullah Jamal 22 Nov 19, 2022
A simple website-based resource monitor for slurm system.

Slurm Web A simple website-based resource monitor for slurm system. Screenshot Required python packages flask, colored, humanize, humanfriendly, beart

Tengda Han 17 Nov 29, 2022
A continuation Of Project Glow By @glowstik-yt

Project Glow Greetings, I see you have stumbled upon project glow. Project glow is an open source bot worked on by many people to create a good and sa

1 Nov 17, 2021
An open letter in support of Richard Matthew Stallman being reinstated by the Free Software Foundation

An open letter in support of RMS. To sign, click here and name the file username.yaml (replace username with your name) with the following content

2.4k Jan 07, 2023
Tools, guides, and resources for blockchain analysts to interface with data on the Ergo platform.

Ergo Intelligence Objective Provide a suite of easy-to-use toolkits, guides, and resources for blockchain analysts and data scientists to quickly unde

Chris 5 Mar 15, 2022
Calculate the efficient frontier

关于 代码主要参考Fábio Neves的文章,你可以在他的文章中找到一些细节性的解释

Wyman Lin 104 Nov 11, 2022
Python Programming Bootcamp

python-bootcamp Python Programming Bootcamp Begin: 27th August 2021 End: 8th September 2021 Registration deadline: 22nd August 2021 Fees: No course or

Rohitash Chandra 11 Oct 19, 2022
A powerful and user-friendly binary analysis platform!

angr angr is a platform-agnostic binary analysis framework. It is brought to you by the Computer Security Lab at UC Santa Barbara, SEFCOM at Arizona S

6.3k Jan 02, 2023
Exercise to teach a newcomer to the CLSP grid to set up their environment and run jobs

Exercise to teach a newcomer to the CLSP grid to set up their environment and run jobs

Alexandra 2 May 18, 2022
Time tracking program that will format output to be easily put into Gitlab

time_tracker Time tracking program that will format output to be easily put into Gitlab. Feel free to branch and use it yourself! Getting Started Clon

Jake Strasler 2 Oct 13, 2022
Hy - A dialect of Lisp that's embedded in Python

Hy Lisp and Python should love each other. Let's make it happen. Hy is a Lisp dialect that's embedded in Python. Since Hy transforms its Lisp code int

Hy Society 4.4k Jan 02, 2023
This is a simple quizz which can ask user for login/register session, then consult to the Quiz interface.

SIMPLE-QUIZ- This is a simple quizz which can ask user for login/register session, then consult to the Quiz interface. By CHAKFI Ahmed MASTER SYSTEMES

CHAKFI Ahmed 1 Jan 10, 2022
Inverted-pendulum-with-fuzzy-control - Inverted pendulum with fuzzy control

Fuzzy Inverted Pendulum Basically, this project consists of an inverted pendulum

Mahan Ahmadvand 1 Aug 25, 2022
pythonOS: An operating system kernel made in python and assembly

pythonOS An operating system kernel made in python and assembly Wait what? It uses a custom compiler called snek that implements a part of python3.9 (

Abbix 69 Dec 23, 2022
Implements a polyglot REPL which supports multiple languages and shared meta-object protocol scope between REPLs.

MetaCall Polyglot REPL Description This repository implements a Polyglot REPL which shares the state of the meta-object protocol between the REPLs. Us

MetaCall 10 Dec 28, 2022
Simple application that does transformation with HPF and LPFs.

Simple application that applies Butterworth, Gaussian & Ideal kernels on HPF and LPFs -aka Frequency Domain Filtering- Upload image from sidebar, set

Merve Noyan 3 Jul 06, 2022