Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
my own python useful functions

PythonToolKit Motivation This Repo should help save time for data scientists' daily work regarding the Time Series regression task by providing functi

Kai 2 Oct 01, 2022
CHIP-8 interpreter written in Python

chip8py CHIP-8 interpreter written in Python Contents About Installation Usage License About CHIP-8 is an interpreted language developed during the 19

Robert Olaru 1 Nov 09, 2021
Anti VirusTotal written in Python.

How it works Most of the anti-viruses on VirusToal uses sandboxes or vms to scan and detect malicious activity. The code checks to see if the devices

cliphd 3 Dec 26, 2021
Python decorator for `TODO`s

Python decorator for `TODO`s. Don't let your TODOs rot in your python projects anymore !

Klemen Sever 74 Sep 13, 2022
Python bilgilerimi eğlenceli bir şekilde hatırlamak ve daha da geliştirmek için The Big Book of Small Python Projects isimli bir kitap almıştım.

Python bilgilerimi eğlenceli bir şekilde hatırlamak ve daha da geliştirmek için The Big Book of Small Python Projects isimli bir kitap almıştım. Bu repo kitaptaki örnek programları çalıştığım oyun al

Burak Selim Senyurt 22 Oct 26, 2022
Simple tools to make/dump CPC+ CPR cartridge files

Simple tools to make/dump CPC+ CPR cartridge files mkcpr.py: make a CPR file from files (one chunk per file); see notes cprdump.py: dump the chunks of

Juan J. Martínez 3 May 30, 2022
A common, beautiful interface to tabular data, no matter the format

rows No matter in which format your tabular data is: rows will import it, automatically detect types and give you high-level Python objects so you can

Álvaro Justen 834 Jan 03, 2023
Find the remote website version based on a git repository

versionshaker Versionshaker is a tool to find a remote website version based on a git repository This tool will help you to find the website version o

Orange Cyberdefense 110 Oct 23, 2022
Scientific color maps and standardization tools

Scicomap is a package that provides scientific color maps and tools to standardize your favourite color maps if you don't like the built-in ones. Scicomap currently provides sequential, bi-sequential

Thomas Bury 14 Nov 30, 2022
This bot uploads telegram files to MixDrop.co,File.io.

What is about this bot ? This bot uploads telegram files to MixDrop.co, File.io. Usage: Send any file, and the bot will upload it to MixDrop.co, File.

Abhijith NT 3 Feb 26, 2022
An app to help people apply for admissions on schools/hostels

Admission-helper About An app to help people apply for admissions on schools/hostels This app is a rewrite of Admission-helper-beta-v5.8.9 and I impor

Advik 3 Apr 24, 2022
Anonfiles files leaker via keyword.

Anonfiles files leaker via keyword

Trac3D1y 6 Nov 23, 2022
A simple hash system.

PBH-Hash-System A simple hash system. Usage You could use it like this: from pbh import pbh print(pbh("Hey", True)) Output: 2feae2471698cfcdcbd6b98ca

Karim 3 Mar 24, 2022
A shim for the typeshed changes in mypy 0.900

types-all A shim for the typeshed changes in mypy 0.900 installation pip install types-all why --install-types is annoying, this installs all the thin

Anthony Sottile 28 Oct 20, 2022
A prototype COG-based tile server for sparse Mars datasets

Mars tiler Mars Tiler is a prototype web application that serves tiles from cloud-optimized GeoTIFFs, with an emphasis on supporting planetary dataset

Daven Quinn 3 Mar 23, 2022
An example file showing a simple endpoints like a login/logout function and maybe some others.

Flask API Example An example project showing a simple endpoints like a login/logout function and maybe some others. How to use: Open up your IDE (or u

Kevin 1 Oct 27, 2021
Declarative and extensible library for configuration & code separation

ClassyConf ClassyConf is the configuration architecture solution for perfectionists with deadlines. It provides a declarative way to define settings f

83 Dec 07, 2022
We'll be using HTML, CSS and JavaScript for the frontend

We'll be using HTML, CSS and JavaScript for the frontend. Nothing to install in specific. Open your text-editor and start coding a beautiful front-end.

Mugada sai tilak 1 Dec 15, 2021
An end-to-end encrypted chat

An end-to-end encrypted chat, that allows users to anonymously talk without ip logs, personal info, or need for registration.

Privalise 1 Nov 27, 2021
UF3: a python library for generating ultra-fast interatomic potentials

Ultra-Fast Force Fields (UF3) S. R. Xie, M. Rupp, and R. G. Hennig, "Ultra-fast interpretable machine-learning potentials", preprint arXiv:2110.00624

Ultra-Fast Force Fields 24 Nov 13, 2022