Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Tips that improve your life in one way or another

Tips that improve your life in one way or another. This software downloads life tips from reddit.com/r/LifeProTips and tweet the most upvoted tips on Twitter.

Burak Tokman 2 Aug 04, 2022
A streaming animation of all the edits to a given Wikipedia page.

WikiFilms! What is it? A streaming animation of all the edits to a given Wikipedia page. How it works. It works by creating a "virtual camera," which

Tal Zaken 2 Jan 18, 2022
A replacement of qsreplace, accepts URLs as standard input, replaces all query string values with user-supplied values and stdout.

Bhedak A replacement of qsreplace, accepts URLs as standard input, replaces all query string values with user-supplied values and stdout. Works on eve

Eshan Singh 84 Dec 31, 2022
A script to automatically update bot status at GitHub as well as in Telegram channel.

A simple & short repository to show your bot's status in your GitHub README.md file as well as in you channel.

Jainam Oswal 55 Dec 13, 2022
Modify the value and status of the records KoboToolbox

Modify the value and status of the records KoboToolbox (Modica el valor y status de los registros de KoboToolbox)

1 Oct 30, 2021
A topology optimization framework written in Taichi programming language, which is embedded in Python.

Taichi TopOpt (Under Active Development) Intro A topology optimization framework written in Taichi programming language, which is embedded in Python.

Li Zhehao 41 Nov 17, 2022
This is a a CSMA/CA simulator written in Python based on simulator of the same type

This is a a CSMA/CA simulator written in Python based on simulator of the same type found the link https://github.com/StevenSLXie/CSMA-Simulator with

M. Ismail 4 Nov 22, 2022
Random pass word generator made with python. PyQt5 module is used to design GUI.

Differences in this GUI program : Default titlebar removed Custom Minimize,Maximize and Close Buttons Drag & move window from any point Program work l

Dimuth De Zoysa 1 Jan 26, 2022
Research on how Gboard Stickers work.

Google-Sticker-Mashup-Research Research on how Gboard Stickers work. Contribute Contributing is nice, and you will be listed below for contributing. C

Jeremiah 45 Oct 28, 2022
Mangá downloader (para leitura offline) voltado para sites e scans brasileiros.

yonde! yonde! (読んで!) é um mangá downloader (para leitura offline) voltado para sites e scans brasileiros. Também permite que você converta os capítulo

Yonde 8 Nov 28, 2021
A Python Based Utility for Processing GST-Return JSON Files to Multiple Formats

GSTR 1/2A Utility by Shan.tk Open Source GSTR 1/GSTR 2A JSON to Excel utility based on Python. Useful for Auditors in Verifying GSTR 1 Return Invoices

Sudharshan TK 1 Oct 08, 2022
适用于HoshinoBot下的人生重来模拟器插件

LifeRestart for HoshinoBot 原作地址 python版原地址 本项目地址 安装方法 这是一个HoshinoBot的人生重来模拟器插件 这个项目使用的HoshinoBot的消息触发器,如果你了解其他机器人框架的api(比如nonebot)可以只修改消息触发器就将本项目移植到其他

黛笙笙 16 Sep 03, 2022
Generate a wordlist to fuzz amounts or any other numerical values.

Generate a wordlist to fuzz amounts or any other numerical values. Based on Common Security Issues in Financially-Oriented Web Applications.

Ivan Šincek 3 Oct 14, 2022
HatAsm - a HatSploit native powerful assembler and disassembler that provides support for all common architectures

HatAsm - a HatSploit native powerful assembler and disassembler that provides support for all common architectures.

EntySec 8 Nov 09, 2022
MeerKAT radio telescope simulation package. Built to simulate multibeam antenna data.

MeerKATgen MeerKAT radio telescope simulation package. Designed with performance in mind and utilizes Just in time compile (JIT) and XLA backed vectro

Peter Ma 6 Jan 23, 2022
Repositorio com arquivos processados da CPI da COVID para facilitar analise

cpi4all Repositorio com arquivos processados da CPI da COVID para facilitar analise Organização No site do senado é possivel encontrar a lista de todo

Breno Rodrigues Guimarães 12 Aug 16, 2021
Python bindings for Basler's VisualApplets TCL script generation

About visualapplets.py The Basler AG company provides a TCL scripting engine to automatize the creation of VisualApplets designs (a former Silicon Sof

Jürgen Hock 2 Dec 07, 2022
SimplePyBLE - Python bindings for SimpleBLE

The ultimate fully-fledged cross-platform Python BLE library, designed for simplicity and ease of use.

Open Bluetooth Toolbox 27 Aug 28, 2022
👀 nothing to see here

Woofy Woofy is blue dog companion token of YFI (Wifey) It utilizes a special Woof bonding curve which allows two-way conversion between the tokens. Th

Yearn Finance 36 Mar 14, 2022
NCAR/UCAR virtual Python Tutorial Seminar Series lesson on MetPy.

The Project Pythia Python Tutorial Seminar Series continues with a lesson on MetPy on Wednesday, 2 February 2022 at 1 PM Mountain Standard Time.

Project Pythia Tutorials 6 Oct 09, 2022