An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Related tags

Data Analysisfastlane
Overview

Fastlane

An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.

Project structure

fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
   |- migrations: (database migrations)
   |- web_api: Flask backend API
   |- web_ui: TBD

Install

  1. Clone the repository
  2. pip install -e .

Example

fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json

--source: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)

--target: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)

--transform: The pipeline's tranform type (default is the only implemented transform so far)

--config: The path to JSON configuration file for the pipeline

--logs_to_slack: Send error logs to slack

--logs_to_cloudwatch: Send logs to cloudwatch

--logs_to_file: Send logs to a file

Extending the framework

The ETL framework has 4 concepts:

Source

The base class fastlane.source.Source provides basic functionality, and defines a standard interface for extracting data from a particular source type. An instance of Source is responsible only for extracting data from source and returning as a python list of dictionaries.

Implementations of the Source base class must fulfill the following functions at minimum:

str: """Return a string describing type of source this is, for example mysql or bigquery""" @classmethod def configuration_schema(cls) -> SourceConfigSchema: """Return a marshmallow schema inherited from SourceConfigSchema base schema. This schema is used to validate the sources configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils


class SourceImpl(Source):
    ...

    def extract(self) -> List[dict]:
        """This function should retrieve data from the source and return it as a list of dictionaries.
            The Source class is an iterator, and this function is called on each iteration. 
            The iterator stops (and source worker exits) when this function returns an empty list. 
            So when there are no more records to fetch, this function should return [].
        """

    @utils.classproperty
    def source_type(self) -> str:
        """Return a string describing type of source this is, for example mysql or bigquery"""

    @classmethod
    def configuration_schema(cls) -> SourceConfigSchema:
        """Return a marshmallow schema inherited from SourceConfigSchema base schema.
            This schema is used to validate the sources configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Source interface is in fastlane.sources.impl.source_mysql

Implementation Coverage

  • MySQL
  • BigQuery
  • MongoDB

Transform

The base class fastlane.transform.Transform provides basic functionality, and defines a standard interface for transforming data to be ready for target. An instance of Transform is responsible only for transforming data from source into a format compatible with target.

Implementations of the Transform base class must fulfill the following functions at minimum:

str: """Return a string describing type of transform this is.""" @classmethod def configuration_schema(cls) -> TransformConfigSchema: """Return a marshmallow schema inherited from TransformConfigSchema base schema. This schema is used to validate the transforms configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils


class TransformImpl(Transform):
    ...

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """This function should run any transformation on the dataframe and return the transformed dataframe.
            Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created, 
            Make sure to remove the old dataframes from memory.
            This function is called by the transform worker every time a new batch of source data has been received.
        """

    @utils.classproperty
    def transform_type(self) -> str:
        """Return a string describing type of transform this is."""

    @classmethod
    def configuration_schema(cls) -> TransformConfigSchema:
        """Return a marshmallow schema inherited from TransformConfigSchema base schema.
            This schema is used to validate the transforms configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Transform interface is in fastlane.transform.impl.transform_default

Target

The base class fastlane.target.Target provides basic functionality, and defines a standard interface for loading data into a destination. An instance of Target is responsible only for storing data which has been transformed into a destination.

Implementations of the Target base class must fulfill the following functions at minimum:

str: """Return a string describing type of target this is.""" @classmethod def target_id(cls, configuration: dict) -> str: """Return a unique identifier from this specific targets configuration. The id should be unique across the whole target destination. For example the target_id for mysql target is built from table and database"""">
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils


class TargetImpl(Target):
    ...

    def load(self, df: pd.DataFrame):
        """This function is called by the target worker every time a new batch of transformed data has been received.
            This function should store the dataframe in whatever destination it implements.
        """

    def get_offset(self):
        """Get the largest key which has been stored in the target. Used from incrementally loaded tables."""

    @utils.classproperty
    def target_type(self) -> str:
        """Return a string describing type of target this is."""

    @classmethod
    def target_id(cls, configuration: dict) -> str:
        """Return a unique identifier from this specific targets configuration. 
            The id should be unique across the whole target destination. 
            For example the target_id for mysql target is built from table and database"""

Example implementation of Target interface is in fastlane.target.impl.target_athena

Implementation Coverage

  • S3
  • InfluxDB
  • MySQL
  • Firehose

Pipeline

The fastlane.pipeline.Pipeline class is what drives the ETL process. It manages the source, transform and target processes, and runs monitoring processes which give insight into the performance/state of the running pipeline.

The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:

        _________________        Queue       ____________________         Queue        ________________    load
extract | source_worker | -->  [|.|.|.|] -->| transform_worker_1 | -->  [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________|                    --------------------                      ----------------    load
                                         -->| transform_worker_2 |                --> | target_worker_2 | ------>
                                             --------------------                      ----------------    load
                                         -->| transform_worker_3 |                --> | target_worker_3 | ------>
                                             --------------------                      ----------------    load
                                                                                  --> | target_worker_4 | ------>
                                                                                       ----------------

Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals such as memory usage, records loaded per second, total records loaded, queue sizes. See fastlane.monitoring.pipeline_monitor for more details on how thats done.

Pipelines Web API

This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.

Resources

CRUD on pipelines

Methods

/api/pipeline

POST
GET
DELETE

/api/pipelines

list pipelines

Methods
GET

/api/pipeline/run

invocation of a particular pipeline

Methods
POST
PUT
GET
DELETE

/api/pipeline/run/latest

latest invocation of a particular pipeline.

Methods
GET

/api/pipeline/run/rps

records per second metrics for a particuar pipeline run.

Methods
GET
POST

/api/pipeline/run/memory_usage

memory usage metrics for a particular pipeline run.

Methods
GET
POST

/api/pipeline/run/logs

logs (from cloudwatch) for a particular pipeline run.

Methods
GET
POST

Pipeline Web UI

Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.

Owner
Dan Katz
Seasoned software engineer working in prototyping, architecting, developing and testing full stack applications
Dan Katz
Bigdata Simulation Library Of Dream By Sandman Books

BIGDATA SIMULATION LIBRARY OF DREAM BY SANDMAN BOOKS ================= Solution Architecture Description In the realm of Dreaming, its ruler SANDMAN,

Maycon Cypriano 3 Jun 30, 2022
Probabilistic reasoning and statistical analysis in TensorFlow

TensorFlow Probability TensorFlow Probability is a library for probabilistic reasoning and statistical analysis in TensorFlow. As part of the TensorFl

3.8k Jan 05, 2023
Vectorizers for a range of different data types

Vectorizers for a range of different data types

Tutte Institute for Mathematics and Computing 69 Dec 29, 2022
INFO-H515 - Big Data Scalable Analytics

INFO-H515 - Big Data Scalable Analytics Jacopo De Stefani, Giovanni Buroni, Théo Verhelst and Gianluca Bontempi - Machine Learning Group Exercise clas

Yann-Aël Le Borgne 58 Dec 11, 2022
A data parser for the internal syncing data format used by Fog of World.

A data parser for the internal syncing data format used by Fog of World. The parser is not designed to be a well-coded library with good performance, it is more like a demo for showing the data struc

Zed(Zijun) Chen 40 Dec 12, 2022
scikit-survival is a Python module for survival analysis built on top of scikit-learn.

scikit-survival scikit-survival is a Python module for survival analysis built on top of scikit-learn. It allows doing survival analysis while utilizi

Sebastian Pölsterl 876 Jan 04, 2023
Exploring the Top ML and DL GitHub Repositories

This repository contains my work related to my project where I scraped data on the most popular machine learning and deep learning GitHub repositories in order to further visualize and analyze it.

Nico Van den Hooff 17 Aug 21, 2022
Exploratory data analysis

Exploratory data analysis An Exploratory data analysis APP TAPIWA CHAMBOKO 🚀 About Me I'm a full stack developer experienced in deploying artificial

tapiwa chamboko 1 Nov 07, 2021
BasstatPL is a package for performing different tabulations and calculations for descriptive statistics.

BasstatPL is a package for performing different tabulations and calculations for descriptive statistics. It provides: Frequency table constr

Angel Chavez 1 Oct 31, 2021
Sensitivity Analysis Library in Python (Numpy). Contains Sobol, Morris, Fractional Factorial and FAST methods.

Sensitivity Analysis Library (SALib) Python implementations of commonly used sensitivity analysis methods. Useful in systems modeling to calculate the

SALib 663 Jan 05, 2023
Predictive Modeling & Analytics on Home Equity Line of Credit

Predictive Modeling & Analytics on Home Equity Line of Credit Data (Python) HMEQ Data Set In this assignment we will use Python to examine a data set

Dhaval Patel 1 Jan 09, 2022
Python ELT Studio, an application for building ELT (and ETL) data flows.

The Python Extract, Load, Transform Studio is an application for performing ELT (and ETL) tasks. Under the hood the application consists of a two parts.

Schlerp 55 Nov 18, 2022
INF42 - Topological Data Analysis

TDA INF421(Conception et analyse d'algorithmes) Projet : Topological Data Analysis SphereMin Etant donné un nuage des points, ce programme contient de

2 Jan 07, 2022
A Python package for Bayesian forecasting with object-oriented design and probabilistic models under the hood.

Disclaimer This project is stable and being incubated for long-term support. It may contain new experimental code, for which APIs are subject to chang

Uber Open Source 1.6k Dec 29, 2022
Statsmodels: statistical modeling and econometrics in Python

About statsmodels statsmodels is a Python package that provides a complement to scipy for statistical computations including descriptive statistics an

statsmodels 8k Dec 29, 2022
A fast, flexible, and performant feature selection package for python.

linselect A fast, flexible, and performant feature selection package for python. Package in a nutshell It's built on stepwise linear regression When p

88 Dec 06, 2022
CleanX is an open source python library for exploring, cleaning and augmenting large datasets of X-rays, or certain other types of radiological images.

cleanX CleanX is an open source python library for exploring, cleaning and augmenting large datasets of X-rays, or certain other types of radiological

Candace Makeda Moore, MD 20 Jan 05, 2023
fds is a tool for Data Scientists made by DAGsHub to version control data and code at once.

Fast Data Science, AKA fds, is a CLI for Data Scientists to version control data and code at once, by conveniently wrapping git and dvc

DAGsHub 359 Dec 22, 2022
Tokyo 2020 Paralympics, Analytics

Tokyo 2020 Paralympics, Analytics Thanks for checking out my app! It was built entirely using matplotlib and Tokyo 2020 Paralympics data. This applica

Petro Ivaniuk 1 Nov 18, 2021