An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Related tags

Data Analysisfastlane
Overview

Fastlane

An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.

Project structure

fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
   |- migrations: (database migrations)
   |- web_api: Flask backend API
   |- web_ui: TBD

Install

  1. Clone the repository
  2. pip install -e .

Example

fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json

--source: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)

--target: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)

--transform: The pipeline's tranform type (default is the only implemented transform so far)

--config: The path to JSON configuration file for the pipeline

--logs_to_slack: Send error logs to slack

--logs_to_cloudwatch: Send logs to cloudwatch

--logs_to_file: Send logs to a file

Extending the framework

The ETL framework has 4 concepts:

Source

The base class fastlane.source.Source provides basic functionality, and defines a standard interface for extracting data from a particular source type. An instance of Source is responsible only for extracting data from source and returning as a python list of dictionaries.

Implementations of the Source base class must fulfill the following functions at minimum:

str: """Return a string describing type of source this is, for example mysql or bigquery""" @classmethod def configuration_schema(cls) -> SourceConfigSchema: """Return a marshmallow schema inherited from SourceConfigSchema base schema. This schema is used to validate the sources configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils


class SourceImpl(Source):
    ...

    def extract(self) -> List[dict]:
        """This function should retrieve data from the source and return it as a list of dictionaries.
            The Source class is an iterator, and this function is called on each iteration. 
            The iterator stops (and source worker exits) when this function returns an empty list. 
            So when there are no more records to fetch, this function should return [].
        """

    @utils.classproperty
    def source_type(self) -> str:
        """Return a string describing type of source this is, for example mysql or bigquery"""

    @classmethod
    def configuration_schema(cls) -> SourceConfigSchema:
        """Return a marshmallow schema inherited from SourceConfigSchema base schema.
            This schema is used to validate the sources configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Source interface is in fastlane.sources.impl.source_mysql

Implementation Coverage

  • MySQL
  • BigQuery
  • MongoDB

Transform

The base class fastlane.transform.Transform provides basic functionality, and defines a standard interface for transforming data to be ready for target. An instance of Transform is responsible only for transforming data from source into a format compatible with target.

Implementations of the Transform base class must fulfill the following functions at minimum:

str: """Return a string describing type of transform this is.""" @classmethod def configuration_schema(cls) -> TransformConfigSchema: """Return a marshmallow schema inherited from TransformConfigSchema base schema. This schema is used to validate the transforms configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils


class TransformImpl(Transform):
    ...

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """This function should run any transformation on the dataframe and return the transformed dataframe.
            Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created, 
            Make sure to remove the old dataframes from memory.
            This function is called by the transform worker every time a new batch of source data has been received.
        """

    @utils.classproperty
    def transform_type(self) -> str:
        """Return a string describing type of transform this is."""

    @classmethod
    def configuration_schema(cls) -> TransformConfigSchema:
        """Return a marshmallow schema inherited from TransformConfigSchema base schema.
            This schema is used to validate the transforms configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Transform interface is in fastlane.transform.impl.transform_default

Target

The base class fastlane.target.Target provides basic functionality, and defines a standard interface for loading data into a destination. An instance of Target is responsible only for storing data which has been transformed into a destination.

Implementations of the Target base class must fulfill the following functions at minimum:

str: """Return a string describing type of target this is.""" @classmethod def target_id(cls, configuration: dict) -> str: """Return a unique identifier from this specific targets configuration. The id should be unique across the whole target destination. For example the target_id for mysql target is built from table and database"""">
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils


class TargetImpl(Target):
    ...

    def load(self, df: pd.DataFrame):
        """This function is called by the target worker every time a new batch of transformed data has been received.
            This function should store the dataframe in whatever destination it implements.
        """

    def get_offset(self):
        """Get the largest key which has been stored in the target. Used from incrementally loaded tables."""

    @utils.classproperty
    def target_type(self) -> str:
        """Return a string describing type of target this is."""

    @classmethod
    def target_id(cls, configuration: dict) -> str:
        """Return a unique identifier from this specific targets configuration. 
            The id should be unique across the whole target destination. 
            For example the target_id for mysql target is built from table and database"""

Example implementation of Target interface is in fastlane.target.impl.target_athena

Implementation Coverage

  • S3
  • InfluxDB
  • MySQL
  • Firehose

Pipeline

The fastlane.pipeline.Pipeline class is what drives the ETL process. It manages the source, transform and target processes, and runs monitoring processes which give insight into the performance/state of the running pipeline.

The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:

        _________________        Queue       ____________________         Queue        ________________    load
extract | source_worker | -->  [|.|.|.|] -->| transform_worker_1 | -->  [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________|                    --------------------                      ----------------    load
                                         -->| transform_worker_2 |                --> | target_worker_2 | ------>
                                             --------------------                      ----------------    load
                                         -->| transform_worker_3 |                --> | target_worker_3 | ------>
                                             --------------------                      ----------------    load
                                                                                  --> | target_worker_4 | ------>
                                                                                       ----------------

Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals such as memory usage, records loaded per second, total records loaded, queue sizes. See fastlane.monitoring.pipeline_monitor for more details on how thats done.

Pipelines Web API

This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.

Resources

CRUD on pipelines

Methods

/api/pipeline

POST
GET
DELETE

/api/pipelines

list pipelines

Methods
GET

/api/pipeline/run

invocation of a particular pipeline

Methods
POST
PUT
GET
DELETE

/api/pipeline/run/latest

latest invocation of a particular pipeline.

Methods
GET

/api/pipeline/run/rps

records per second metrics for a particuar pipeline run.

Methods
GET
POST

/api/pipeline/run/memory_usage

memory usage metrics for a particular pipeline run.

Methods
GET
POST

/api/pipeline/run/logs

logs (from cloudwatch) for a particular pipeline run.

Methods
GET
POST

Pipeline Web UI

Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.

Owner
Dan Katz
Seasoned software engineer working in prototyping, architecting, developing and testing full stack applications
Dan Katz
4CAT: Capture and Analysis Toolkit

4CAT: Capture and Analysis Toolkit 4CAT is a research tool that can be used to analyse and process data from online social platforms. Its goal is to m

Digital Methods Initiative 147 Dec 20, 2022
A Python Tools to imaging the shallow seismic structure

ShallowSeismicImaging Tools to imaging the shallow seismic structure, above 10 km, based on the ZH ratio measured from the ambient seismic noise, and

Xiao Xiao 9 Aug 09, 2022
Maximum Covariance Analysis in Python

xMCA | Maximum Covariance Analysis in Python The aim of this package is to provide a flexible tool for the climate science community to perform Maximu

Niclas Rieger 39 Jan 03, 2023
BIGDATA SIMULATION ONE PIECE WORLD CENSUS

ONE PIECE is a Japanese manga of great international success. The story turns inhabited in a fictional world, tells the adventures of a young man whose body gained rubber properties after accidentall

Maycon Cypriano 3 Jun 30, 2022
Weather analysis with Python, SQLite, SQLAlchemy, and Flask

Surf's Up Weather analysis with Python, SQLite, SQLAlchemy, and Flask Overview The purpose of this analysis was to examine weather trends (precipitati

Art Tucker 1 Sep 05, 2021
Analytical view of olist e-commerce in Brazil

Analysis of E-Commerce Public Dataset by Olist The objective of this project is to propose an analytical view of olist e-commerce in Brazil. For this

Gurpreet Singh 1 Jan 11, 2022
AWS Glue ETL Code Samples

AWS Glue ETL Code Samples This repository has samples that demonstrate various aspects of the new AWS Glue service, as well as various AWS Glue utilit

AWS Samples 1.2k Jan 03, 2023
Using Data Science with Machine Learning techniques (ETL pipeline and ML pipeline) to classify received messages after disasters.

Using Data Science with Machine Learning techniques (ETL pipeline and ML pipeline) to classify received messages after disasters.

1 Feb 11, 2022
This is an example of how to automate Ridit Analysis for a dataset with large amount of questions and many item attributes

This is an example of how to automate Ridit Analysis for a dataset with large amount of questions and many item attributes

Ishan Hegde 1 Nov 17, 2021
Catalogue data - A Python Scripts to prepare catalogue data

catalogue_data Scripts to prepare catalogue data. Setup Clone this repo. Install

BigScience Workshop 3 Mar 03, 2022
PipeChain is a utility library for creating functional pipelines.

PipeChain Motivation PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Austra

Michael Milton 2 Aug 07, 2022
pyhsmm MITpyhsmm - Bayesian inference in HSMMs and HMMs. MIT

Bayesian inference in HSMMs and HMMs This is a Python library for approximate unsupervised inference in Bayesian Hidden Markov Models (HMMs) and expli

Matthew Johnson 527 Dec 04, 2022
This repo contains a simple but effective tool made using python which can be used for quality control in statistical approach.

This repo contains a powerful tool made using python which is used to visualize, analyse and finally assess the quality of the product depending upon the given observations

SasiVatsal 8 Oct 18, 2022
NumPy aware dynamic Python compiler using LLVM

Numba A Just-In-Time Compiler for Numerical Functions in Python Numba is an open source, NumPy-aware optimizing compiler for Python sponsored by Anaco

Numba 8.2k Jan 07, 2023
A Python package for Bayesian forecasting with object-oriented design and probabilistic models under the hood.

Disclaimer This project is stable and being incubated for long-term support. It may contain new experimental code, for which APIs are subject to chang

Uber Open Source 1.6k Dec 29, 2022
A set of functions and analysis classes for solvation structure analysis

SolvationAnalysis The macroscopic behavior of a liquid is determined by its microscopic structure. For ionic systems, like batteries and many enzymes,

MDAnalysis 19 Nov 24, 2022
The official pytorch implementation of ViTAE: Vision Transformer Advanced by Exploring Intrinsic Inductive Bias

ViTAE: Vision Transformer Advanced by Exploring Intrinsic Inductive Bias Introduction | Updates | Usage | Results&Pretrained Models | Statement | Intr

104 Nov 27, 2022
A highly efficient and modular implementation of Gaussian Processes in PyTorch

GPyTorch GPyTorch is a Gaussian process library implemented using PyTorch. GPyTorch is designed for creating scalable, flexible, and modular Gaussian

3k Jan 02, 2023
Convert tables stored as images to an usable .csv file

Convert an image of numbers to a .csv file This Python program aims to convert images of array numbers to corresponding .csv files. It uses OpenCV for

711 Dec 26, 2022