A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Simple yet flexible natural sorting in Python.

natsort Simple yet flexible natural sorting in Python. Source Code: https://github.com/SethMMorton/natsort Downloads: https://pypi.org/project/natsort

Seth Morton 712 Dec 23, 2022
A discord group chat creator just made it because i saw people selling this stuff for like up to 40 bucks

gccreator some discord group chat tools just made it because i saw people selling this stuff for like up to 40 bucks (im currently working on a faster

baum1810 6 Oct 03, 2022
Learn to code in any language. If

Learn to Code It is an intiiative undertaken by Student Ambassadors Club, Jamshoro for students who are absolute begineers in programming and want to

Student Ambassadors' Club at Mehran UET 15 Oct 19, 2022
My Dotfiles of Arco Linux

Arco-DotFiles My Dotfiles of Arco Linux Apps Used Htop LightDM lightdm-webkit2-greeter Alacritty Qtile Cava Spotify nitrogen neofetch Spicetify Thunar

$BlueDev5 6 Dec 11, 2022
Final Fantasy XIV Auto House Clicker

Final Fantasy XIV Auto House Clicker

KanameS 0 Mar 31, 2022
A submodule of rmcrkd/ODE-Uniqueness

Heston-ODE This repo contains the Heston-related code that accompanies the article One-sided maximal uniqueness for a class of spatially irregular ord

0 Jan 05, 2022
Reproducible nvim completion framework benchmarks.

Nvim.Bench Reproducible nvim completion framework benchmarks. Runs inside Docker. Fair and balanced Methodology Note: for all "randomness", they are g

i love my dog 14 Nov 20, 2022
A python package to adjust the bias of probabilistic forecasts/hindcasts using "Mean and Variance Adjustment" method.

Documentation A python package to adjust the bias of probabilistic forecasts/hindcasts using "Mean and Variance Adjustment" method. Read documentation

1 Feb 02, 2022
Fortnite StW Claimer for Daily Rewards, Research Points and free Llamas.

Fortnite Save the World Daily Reward, Research Points & free Llama Claimer This program allows you to claim Save the World Daily Reward, Research Poin

PRO100KatYT 27 Dec 22, 2022
Short, introductory guide for the Python programming language

100 Page Python Intro This book is a short, introductory guide for the Python programming language.

Sundeep Agarwal 185 Dec 26, 2022
msImpersonate - User account impersonation written in pure Python3

msImpersonate v1.0 msImpersonate is a Python-native user impersonation tool that is capable of impersonating local or network user accounts with valid

Joe Helle 90 Dec 16, 2022
Este script añade la config de s4vitar a bspwm automaticamente!

Se ha testeado este script en ParrotOS, Kali y Ubuntu. Funciona para todos los sistemas operativos basados en Debian. Instalación git clone https://gi

yorkox 201 Dec 30, 2022
Add-In for Blender to automatically save files when rendering

Autosave - Render: Automatically save .blend, .png and readme.txt files when rendering with Blender Purpose This Blender Add-On provides an easy way t

Volker 9 Aug 10, 2022
combs is a package used to generate all possible combinations of a given length k on a given set.

The package combs is a package used to generate all possible combinations of a given length k on a given set. The set is given as a list, and k must b

1 Dec 24, 2021
oracle arm registration script.

oracle_arm oracle arm registration script. 乌龟壳刷ARM脚本 本脚本优点 简单,主机配置好oci,然后下载main.tf即可,不用自己获取各种参数。 运行环境配置 本简单脚本使用python3编写,请自行配置好python3环境和requests库。(高版

test1234455 419 Jan 01, 2023
You can easily send campaigns, e-marketing have actually account using cash will thank you for using our tools, and you can support our Vodafone Cash +201090788026

*** Welcome User Sorry I Mean Hello Brother ✓ Devolper and Design : Mokhtar Abdelkreem ========================================== You Can Follow Us O

Mo Code 1 Nov 03, 2021
A Python application that simulates the rolling of a dice, randomly picking one of the 6 faces and then displaying it.

dice-roller-app This is an application developed in Python that shuffles between the 6 faces of a dice, using buttons to shuffle and close the applica

Paddy Costelloe 0 Jul 20, 2021
Fithub is a website application for athletes and fitness enthusiasts of all ages and experience levels.

Fithub is a website application for athletes and fitness enthusiasts of all ages and experience levels. Our website allows users to easily search, filter, and sort our comprehensive database of over

Andrew Wu 1 Dec 13, 2021
A Github Action for sending messages to a Matrix Room.

matrix-commit A Github Action for sending messages to a Matrix Room. Screenshot: Example Usage: # .github/workflows/matrix-commit.yml on: push:

3 Sep 11, 2022
You will need to install a few python packages for this one.

Features Bait support Auto repair will repair every 10 catches Anti detection (still a work in progress) but using random times and click positions Pr

12 Sep 21, 2022