Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Expose multicam options in the Blender VSE headers.

Multicam Expose multicam options in the Blender VSE headers. Install Download space_sequencer.py and swap it with the one that comes with the Blender

4 Feb 27, 2022
WildHack 2021 solution by Nuclear Foxes team (public version).

WildHack 2021 Nuclear Foxes Team This repo contains our project for the Wildberries Hackathon 2021. Task 2: Searching tags Implement an algorithm of r

Sergey Zakharov 1 Apr 18, 2022
Demo code for "Logs in distributed systems" webinar

Hexlet Logs Demo Пререквизиты docker-compose python3 Учетка в DataDog Базовое понимание, что такое логи (можно почитать гайд

Anton Markelov 1 Dec 01, 2021
Combines power of torch, numerical methods to conquer and solve ALL {O,P}DEs

torch_DE_solver Combines power of torch, numerical methods and math overall to conquer and solve ALL {O,P}DEs There are three examples to provide a li

Natural Systems Simulation Lab 28 Dec 12, 2022
A redesign of our previous Python World Cup, aiming to simulate the 2022 World Cup all the way from the qualifiers

A redesign of our previous Python World Cup, aiming to simulate the 2022 World Cup all the way from the qualifiers. This new version is designed to be more compact and more efficient and will reflect

Sam Counsell 1 Jan 07, 2022
End-to-End text sumarization, QAs generation using flask.

Help-Me-Read A web application created with Flask + BootStrap + HuggingFace 🤗 to generate summary and question-answer from given input text. It uses

Ankush Kuwar 12 Nov 13, 2022
Automated Changelog/release note generation

Quickly generate changelogs and release notes by analysing your git history. A tool written in python, but works on any language.

Documatic 95 Jan 03, 2023
Multi-Probe Attention for Semantic Indexing

Multi-Probe Attention for Semantic Indexing About This project is developed for the topic of COVID-19 semantic indexing. Directories & files A. The di

Jinghang Gu 1 Dec 18, 2022
Tomador de ramos UC automatico para Windows, Linux y macOS

auto-ramos v2.0 Tomador de ramos UC automatico para Windows, Linux y macOS Funcion Este script de Python tiene como principal objetivo hacer que la to

Open Source eUC 13 Jun 29, 2022
Ferramenta de monitoramento do risco de colapso no sistema de saúde em municípios brasileiros com a Covid-19.

FarolCovid 🚦 Ferramenta de monitoramento do risco de colapso no sistema de saúde em municípios brasileiros com a Covid-19. Monitoring tool & simulati

Impulso 49 Jul 10, 2022
Python library for generating CycloneDX SBOMs

Python Library for generating CycloneDX This CycloneDX module for Python can generate valid CycloneDX bill-of-material document containing an aggregat

CycloneDX SBOM Standard 31 Dec 16, 2022
Display your data in an attractive way in your notebook!

Bloxs Bloxs is a simple python package that helps you display information in an attractive way (formed in blocks). Perfect for building dashboards, re

MLJAR 192 Dec 28, 2022
Boot.img patcher for Tolino ebook readers to enable ADB and root.

I'm not responsible for any damage to your devices by running this tool. Please note that you may loose warranty when using this, although (This is no

Aaron Dewes 9 Nov 13, 2022
This is a simple web interface for SimplyTranslate

SimplyTranslate Web This is a simple web interface for SimplyTranslate List of Instances You can find a list of instances here: SimplyTranslate Projec

4 Dec 14, 2022
The fetch of the delegator list and the input of the epoch nonce need to be done independently

raffle The fetch of the delegator list and the input of the epoch nonce need to be done independently. Get the list of delegators at the epoch change.

1 Dec 15, 2021
Covid-ChatBot - A Rapid Response Virtual Agent for Covid-19 Queries

COVID-19 CHatBot A Rapid Response Virtual Agent for Covid-19 Queries Contents What is ChatBot Types of ChatBots About the Project Dataset Prerequisite

NelakurthiSudheer 2 Jan 04, 2022
Synthetik Python Mod - A save editor tool for the game Synthetik written in python

Synthetik_Python_Mod A save editor tool for the game Synthetik written in python

2 Sep 10, 2022
Shai-Hulud - A qtile configuration for the (spice) masses

Shai-Hulud - A qtile configuration for the (spice) masses Installation Notes These dotfiles are set up to use GNU stow for installation. To install, f

16 Dec 30, 2022
Chalice - A tool to facilitate Python based lambda deployment

Chalice is a tool to facilitate Python based lambda deployment. This repo contains the output of my basic exploration of this tool.

Csilla Bessenyei 1 Feb 03, 2022
Utility/Raiding selfbot made by Shell and Roover.

Utility/Raiding selfbot made by Shell and Roover. We are open to suggestions and ideas.

Shell 2 Dec 08, 2021