Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
This repo is a collection of programs and websites templates too

📢 Register here for Hacktoberfest and make four pull requests (PRs) between October 1st-31st to grab free SWAGS 🔥 . IMPORTANT While making pull requ

Binayak Jha - 2 7 Oct 03, 2022
Covid 19 status. Flask application. CovidAPI. Heroku.

Covid 19 In this project we see total count of people who got this virus and total death. How does it works Written in Python. Web app, Flask. package

AmirHossein Mohammadi 12 Jan 16, 2022
A dot matrix rendered using braille characters.

⣿ dotmatrix A dot matrix rendered using braille characters. Description This library provides class called Matrix which represents a dot matrix that c

Tim Fischer 25 Dec 12, 2022
A community-driven python bot that aims to be as simple as possible to serve humans with their everyday tasks

JARVIS on Messenger Just A Rather Very Intelligent System, now on Messenger! Messenger is now used by 1.2 billion people every month. With the launch

Swapnil Agarwal 1.3k Jan 07, 2023
一个IDA脚本,可以检测出哈希算法(无论是否魔改常数)并生成frida hook 代码。

findhash 在哈希算法上,比Findcrypt更好的检测工具,同时生成Frida hook代码。 使用方法 把findhash.xml和findhash.py扔到ida plugins目录下 ida -edit-plugin-findhash 试图解决的问题 哈希函数的初始化魔数被修改 想快速

266 Dec 29, 2022
A jokes python module

Made with Python3 (C) @FayasNoushad Copyright permission under MIT License License - https://github.com/FayasNoushad/Jokes/blob/main/LICENSE Deploy

Fayas Noushad 3 Nov 28, 2021
Python Classes Without Boilerplate

attrs is the Python package that will bring back the joy of writing classes by relieving you from the drudgery of implementing object protocols (aka d

The attrs Cabal 4.6k Jan 02, 2023
Generating rent availability info from Effort rent

Rent-info Generating rent availability info from Effort rent Pre-Installation Latest version of python Pip module json, os, requests, datetime, time i

Laixuan 1 Oct 20, 2021
HogwartsRegister - A Hogwarts Register With Python

A Hogwarts Register Installation download code git clone https://github.com/haor

0 Feb 12, 2022
CircuitPython Driver for Adafruit 24LC32 I2C EEPROM Breakout 32Kbit / 4 KB

Introduction CircuitPython driver for Adafruit 24LC32 I2C EEPROM Breakout Dependencies This driver depends on: Adafruit CircuitPython Bus Device Regis

Adafruit Industries 4 Oct 03, 2022
Display your data in an attractive way in your notebook!

Bloxs Bloxs is a simple python package that helps you display information in an attractive way (formed in blocks). Perfect for building dashboards, re

MLJAR 192 Dec 28, 2022
Um jogo para treinar COO em python

WAR DUCK Este joguinho bem simples tem como objetivo treinar um pouquinho de POO com python. Não é nada muito complexo mas da pra se divertir Como rod

Gabriel Jospin 3 Sep 19, 2021
🇮🇳 A Indian Flag Animation Project Made With Python

🇮🇳 A Indian Flag Animation Project Made With Python

MuFaz-TG 2 Oct 21, 2022
Zotero references script (and app)

A little script (and PyInstaller build) for a very specific, somewhat hack-ish purpose: managing and exporting project references with Zotero and its API.

Marius Rödder 0 Dec 05, 2021
Class XII computer science project.

Computer Science Project — Class XII Kshitij Srivastava (XI – A) Introduction The aim of this project is to create a fully operational system for a me

Kshitij Srivastava 2 Jul 21, 2022
Kivy program for identification & rotation sensing of objects on multi-touch tables.

ObjectViz ObjectViz is a multitouch object detection solution, enabling you to create physical markers out of any reliable multitouch solution. It's e

TangibleDisplay 8 Apr 04, 2022
"Hacking" the (Telekom) Zyxel GPON SFP module (PMG3000-D20B)

"Hacking" the (Telekom) Zyxel GPON SFP module (PMG3000-D20B) The SFP can be sour

Matthias Riegler 52 Jan 03, 2023
pybind11 — Seamless operability between C++11 and Python

pybind11 — Seamless operability between C++11 and Python Setuptools example • Scikit-build example • CMake example pybind11 is a lightweight header-on

pybind 12.1k Jan 08, 2023
Paxos in Python, tested with Jepsen

Python implementation of Multi-Paxos with a stable leader and reconfiguration, roughly following "Paxos Made Moderately Complex". Run python3 paxos/st

A. Jesse Jiryu Davis 25 Dec 15, 2022
tg-nearby Trilateration of nearby Telegram users as described in my corresponding article.

tg-nearby Trilateration of nearby Telegram users as described in my corresponding article. Setup If you want to toy with the code in this repository

Maximilian Jugl 75 Dec 26, 2022