ETL flow framework based on Yaml configs in Python

Overview

logo

ETL framework based on Yaml configs in Python

Supported Python Versions License Code style: black

A light framework for creating data streams. Setting up streams through configuration in the Yaml file. There is a schedule, task pools, concurrency limitation. Works quickly, does not require a lot of resources. Runs on Windows and Linux. Flow run in parallel via threading library. Internally SQLite Database. Native data transformation. There is a web interface.

At the moment there are connectors to sources

  • CSV file
  • SQLite
  • Postgres
  • MySQL
  • Yandex Metrika Management API
  • Yandex Metrika Stats API
  • Yandex Metrika Logs API
  • Yandex Direct API
  • Yandex Direct Report API
  • Criteo
  • Google Sheets

Storages

  • Save to csv file
  • Clickhouse

Documentation

Requirements

  • python >=3.9
  • virtual environment

Settings

It is highly recommended to install in a virtual environment.

Flowmaster needs a home, '{HOME}/FlowMaster' is the default,
but you can lay foundation somewhere else if you prefer
(optional)

For Windows

setx FLOWMASTER_HOME "{YOUR_PATH}"

For Linux

export FLOWMASTER_HOME={YOUR_PATH}

Installing

pip install flowmaster==0.7.1

# For install web UI.
pip install flowmaster[webui]==0.7.1

# Optional libraries.
pip install flowmaster[clickhouse,postgres,mysql,yandexdirect,yandexmetrika,criteo,googlesheets]==0.7.1

Run

flowmaster run --help
flowmaster run

WEB UI

http://localhost:8822

CHANGELOG

Support

Telegram support chat

Author

Pavel Maksimov

My contacts Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

You might also like...
signac-flow - manage workflows with signac
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

Randomisation-based inference in Python based on data resampling and permutation.

Randomisation-based inference in Python based on data resampling and permutation.

Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

 PyChemia, Python Framework for Materials Discovery and Design
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

Comments
  •  No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    Привет, очень хороший проект, однако столкнулся со следующей проблемой при устанвоке библиотеки

    1. с ванильным python pip такого пакета вообще не видно
    2. при установке через conda установка проходит замечательно, однако при запуске получаю
    (base) [email protected]:~/FlowMaster$ flowmaster run
    Traceback (most recent call last):
      File "/home/ubuntu/miniforge3/bin/flowmaster", line 5, in <module>
        from flowmaster.__main__ import app
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/__main__.py", line 9, in <module>
        import flowmaster.cli.notebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/cli/notebook.py", line 5, in <module>
        from flowmaster.service import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/service.py", line 11, in <module>
        from flowmaster.operators.etl.policy import ETLNotebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/__init__.py", line 3, in <module>
        from flowmaster.operators.etl.providers.abstract import ProviderAbstract, ExportAbstract
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/__init__.py", line 4, in <module>
        from flowmaster.operators.etl.providers.criteo import CriteoProvider
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/__init__.py", line 2, in <module>
        from flowmaster.operators.etl.providers.criteo.export import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/export.py", line 8, in <module>
        from flowmaster.executors import SleepIteration
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/executors/__init__.py", line 16, in <module>
        from flowmaster.pool import pools
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/pool.py", line 106, in <module>
        pools_dict = YamlHelper.parse_file(str(Settings.POOL_CONFIG_FILEPATH))
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/utils/yaml_helper.py", line 14, in parse_file
        with open(path, "rb") as f:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'
    

    Что я делаю не так?(

    opened by micweeks 1
Releases(0.7.1)
  • 0.7.1(Aug 29, 2021)

    • prevented planned of tasks from one instance of the operator class
    • fixed error GeneratorExit
    • fixed transform array type for Clickhouse loader
    Source code(tar.gz)
    Source code(zip)
  • 0.6.1(Jun 22, 2021)

    Redesigned executor

    New

    • add politics 'time_limit_seconds_from_worktime', 'soft_time_limit_seconds'.
    • add provider 'flowmaster'

    Fixing

    • fix schedule (interval seconds mode)
    • add logging 'loguru'
    • fix clear_statuses_of_lost_items
    • fix allow_execute_flow
    • change command 'db reset'

    There are backward incompatible changes

    • new field 'expires_utc' in FlowItem
    • rename command 'run' to 'run_local' and rename command 'run_thread' to 'run'
    • add new class ExecutorIterationTask.
    • change, moving and rename class ThreadExecutor to ThreadAsyncExecutor.
    • change and rename class SleepTask to SleepIteration.
    • change and rename class TaskPool to NextIterationInPools.
    • ETLOperator return ExecutorIterationTask.
    • rename func order_flow to ordering_flow_tasks.
    • rename func start_executor to sync_executor.
    • rename field FlowItem.config_hash to FlowItem.notebook_hash
    • change FLOW_CONFIGS_DIR and rename FLOW_CONFIGS_DIR to NOTEBOOKS_DIR
    • rename objects config to notebook
    • add class Settings
    Source code(tar.gz)
    Source code(zip)
  • 0.3.1(May 15, 2021)

  • 0.2.2(May 13, 2021)

Owner
Павел Максимов
Python Data Engineer, Python Developer, ETL, Разработчик рекомендательных систем
Павел Максимов
Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks

The following Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks (MOFs). The training set is extracted from the Cambridge S

1 Jan 09, 2022
DefAP is a program developed to facilitate the exploration of a material's defect chemistry

DefAP is a program developed to facilitate the exploration of a material's defect chemistry. A large number of features are provided and rapid exploration is supported through the use of autoplotting

6 Oct 25, 2022
The repo for mlbtradetrees.com. Analyze any trade in baseball history!

The repo for mlbtradetrees.com. Analyze any trade in baseball history!

7 Nov 20, 2022
Used for data processing in machine learning, and help us to construct ML model more easily from scratch

Used for data processing in machine learning, and help us to construct ML model more easily from scratch. Can be used in linear model, logistic regression model, and decision tree.

ShawnWang 0 Jul 05, 2022
Tools for analyzing data collected with a custom unity-based VR for insects.

unityvr Tools for analyzing data collected with a custom unity-based VR for insects. Organization: The unityvr package contains the following submodul

Hannah Haberkern 1 Dec 14, 2022
Intake is a lightweight package for finding, investigating, loading and disseminating data.

Intake: A general interface for loading data Intake is a lightweight set of tools for loading and sharing data in data science projects. Intake helps

Intake 851 Jan 01, 2023
Random dataframe and database table generator

Random database/dataframe generator Authored and maintained by Dr. Tirthajyoti Sarkar, Fremont, USA Introduction Often, beginners in SQL or data scien

Tirthajyoti Sarkar 249 Jan 08, 2023
Retail-Sim is python package to easily create synthetic dataset of retaile store.

Retailer's Sale Data Simulation Retail-Sim is python package to easily create synthetic dataset of retaile store. Simulation Model Simulator consists

Corca AI 7 Sep 30, 2022
A real-time financial data streaming pipeline and visualization platform using Apache Kafka, Cassandra, and Bokeh.

Realtime Financial Market Data Visualization and Analysis Introduction This repo shows my project about real-time stock data pipeline. All the code is

6 Sep 07, 2022
General Assembly's 2015 Data Science course in Washington, DC

DAT8 Course Repository Course materials for General Assembly's Data Science course in Washington, DC (8/18/15 - 10/29/15). Instructor: Kevin Markham (

Kevin Markham 1.6k Jan 07, 2023
A collection of robust and fast processing tools for parsing and analyzing web archive data.

ChatNoir Resiliparse A collection of robust and fast processing tools for parsing and analyzing web archive data. Resiliparse is part of the ChatNoir

ChatNoir 24 Nov 29, 2022
Pipeline and Dataset helpers for complex algorithm evaluation.

tpcp - Tiny Pipelines for Complex Problems A generic way to build object-oriented datasets and algorithm pipelines and tools to evaluate them pip inst

Machine Learning and Data Analytics Lab FAU 3 Dec 07, 2022
An Indexer that works out-of-the-box when you have less than 100K stored Documents

U100KIndexer An Indexer that works out-of-the-box when you have less than 100K stored Documents. U100K means under 100K. At 100K stored Documents with

Jina AI 7 Mar 15, 2022
Spectral Analysis in Python

SPECTRUM : Spectral Analysis in Python contributions: Please join https://github.com/cokelaer/spectrum contributors: https://github.com/cokelaer/spect

Thomas Cokelaer 280 Dec 16, 2022
Synthetic Data Generation for tabular, relational and time series data.

An Open Source Project from the Data to AI Lab, at MIT Website: https://sdv.dev Documentation: https://sdv.dev/SDV User Guides Developer Guides Github

The Synthetic Data Vault Project 1.2k Jan 07, 2023
Python utility to extract differences between two pandas dataframes.

Python utility to extract differences between two pandas dataframes.

Jaime Valero 8 Jan 07, 2023
Learn machine learning the fun way, with Oracle and RedBull Racing

Red Bull Racing Analytics Hands-On Labs Introduction Are you interested in learning machine learning (ML)? How about doing this in the context of the

Oracle DevRel 55 Oct 24, 2022
Data analysis and visualisation projects from a range of individual projects and applications

Python-Data-Analysis-and-Visualisation-Projects Data analysis and visualisation projects from a range of individual projects and applications. Python

Tom Ritman-Meer 1 Jan 25, 2022
Reading streams of Twitter data, save them to Kafka, then process with Kafka Stream API and Spark Streaming

Using Streaming Twitter Data with Kafka and Spark Reading streams of Twitter data, publishing them to Kafka topic, process message using Kafka Stream

Rustam Zokirov 1 Dec 06, 2021
Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation

Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation Overview Consider the scenario in which advertisement

Manuel Bressan 2 Nov 18, 2021