A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
A very basic ciphering/deciphering tool

ckrett-python-library This is an useful python library for people who care about privacy, this library is useful to cipher and decipher text using 4 s

SasiVatsal 8 Oct 18, 2022
Package pyVHR is a comprehensive framework for studying methods of pulse rate estimation relying on remote photoplethysmography (rPPG)

Package pyVHR (short for Python framework for Virtual Heart Rate) is a comprehensive framework for studying methods of pulse rate estimation relying on remote photoplethysmography (rPPG)

PHUSE Lab 261 Jan 03, 2023
Tool to automate the enumeration of a website (CTF)

had4ctf Tool to automate the enumeration of a website (CTF) DISCLAIMER: THE TOOL HAS BEEN DEVELOPED SOLELY FOR EDUCATIONAL PURPOSE ,I WILL NOT BE LIAB

Had 2 Oct 24, 2021
Repository for 2021 Computer Vision Class @ Chulalongkorn University

2110443 - Computer Vision (2021/2) Computer Vision @ Chulalongkorn University Anaconda Download Link https://www.anaconda.com/download/ Miniconda and

Chula PIC Lab 5 Jul 19, 2022
A python script that fetches the grades of a student from a WAEC result in pdf format.

About waec-result-analyzer A python script that fetches the grades of a student from a WAEC result in pdf format. Built for federal government college

Oshodi Kolapo 2 Dec 04, 2021
Airplane reservation system python 2

airplane-reservation-system-python-2 Announcement 🔊 : 🔴 IMPORTANT 🔴 : Few new things have been added into the code [16/05/2021] different names is

voyager2005 1 Dec 06, 2021
Python decorator for `TODO`s

Python decorator for `TODO`s. Don't let your TODOs rot in your python projects anymore !

Klemen Sever 74 Sep 13, 2022
A bot to view Dilbert comics directly from Discord and get updates of the comics automatically.

A bot to view Dilbert comics directly from Discord and get updates of the comics automatically

Raghav Sharma 3 Nov 30, 2022
Another Provably Rare Gem Miner 💎 (for Raritygems)

Provably Rare Gem Miner Go (for Rarity) Pull Request is strongly welcome as I don't know anything about Golang/Python/Web3. Usage Install Python 3.x i

朱里 6 Apr 22, 2022
Yandex Media Browser

Браузер медиа для плагина Yandex Station Включайте музыку, плейлисты и радио на Яндекс.Станции из Home Assistant! Скриншот Корневой раздел: Библиотека

Alexander Ryazanov 35 Dec 19, 2022
Archive, organize, and watch for changes to publicly available information.

0. Overview The Trapper Keeper is a collection of scripts that support archiving information from around the web to make it easier to study and use. I

Bill Fitzgerald 9 Oct 26, 2022
Singularity Containers on Apple M1 (ARM64)

Singularity Containers on Apple M1 (ARM64) This is a repository containing a ready-to-use environment for singularity in arm64 (M1). It has been prepa

Manuel Parra 4 Nov 14, 2022
Biohacking con Python honeycon21

biohacking-honeycon21 This repository includes the slides of the public presentation 'Biohacking con Python' in the Hack&Beers of HoneyCON21 (PPTX and

3 Nov 13, 2021
Runs macOS on linux with qemu.

mac-on-linux-with-qemu Runs macOS on linux with qemu. Pre-requisites qemu-system-x86_64 dmg2img pulseaudio python[click] Usage After cloning the repos

Arindam Das 177 Dec 26, 2022
An extended version of the hotkeys demo code using action classes

An extended version of the hotkeys application using action classes. In adafruit's Hotkeys code, a macro is using a series of integers, assumed to be

Neradoc 5 May 01, 2022
Versión preliminar análisis general de Covid-19 en Colombia

Covid_Colombia_v09 Versión: Python 3.8.8 1/ La base de datos del Ministerio de Salud (Minsalud Colombia) está en https://www.datos.gov.co/Salud-y-Prot

Julián Gómez 1 Jan 30, 2022
⏰ Shutdown Timer is an application that you can shutdown, restart, logoff, and hibernate your computer with a timer.

Shutdown Timer is a an application that you can shutdown, restart, logoff, and hibernate your computer with a timer. After choosing an action from the

Mehmet Güdük 5 Jun 27, 2022
Get a list of content on your Netflix My List that is expiring in the next month or two.

Netflix My List Expiring Movies Annoyed at Netflix for taking away your movies? Now you don't have to be! Installation instructions Install selenium C

24 Aug 06, 2022
Imitate Moulinette written in Python

Imitate Moulinette written in Python

Pumidol Leelerdsakulvong 2 Jul 26, 2022
BOHB tune library template (included example)

BOHB-template 실행 방법 python main.py 2021-10-10 기준 tf keras 버전 (tunecallback 방식) 완료 tf gradienttape 버전 (train_iteration 방식) 완료 pytorch 버전은 구현 준비중 방법 소개

Seungwoo Han 5 Mar 24, 2022