Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Vector tile server for the Wildfire Predictive Services Unit

wps-tileserver Vector tile server for the Wildfire Predictive Services Unit Overview The intention of this project is to: provide tools to easily spin

Province of British Columbia 6 Dec 20, 2022
A Python library to simulate a Zoom H6 recorder remote control

H6 A Python library to emulate a Zoom H6 recorder remote control Introduction This library allows you to control your Zoom H6 recorder from your compu

Matias Godoy 68 Nov 02, 2022
OB_Template is a vault template reference for using Obsidian.

Obsidian Template OB_Template is a vault template reference for using Obsidian. If you've tested out Obsidian. and worked through the "Obsidian Help"

323 Dec 27, 2022
Import Apex legends mprt files exported from Legion

Apex-mprt-importer-for-Blender Import Apex legends mprt files exported from Legion. REQUIRES CAST IMPORTER Usage: Use a VPK extracter to extract the m

15 Dec 18, 2022
A tool to guide you for team selection based on mana and ruleset using your owned cards.

Splinterlands_Teams_Guide A tool to guide you for team selection based on mana and ruleset using your owned cards. Built With This project is built wi

Ruzaini Subri 3 Jul 30, 2022
Repository for my Monika Assistant project

Monika_Assistant Repository for my Monika Assistant project Major changes: Added face tracker Added manual daily log to see how long it takes me to fi

3 Jan 10, 2022
Retrieve bank transactions and categorize for budgeting use

Budgeting After trying out some budgeting software, I decided to make my own. selenium_scraper Using the selenium package, this script runs an instanc

Marc 1 Nov 10, 2021
(Pre-)compromise operations for MITRE CALDERA

(Pre-)compromise operations for CALDERA Extend your CALDERA operations over the entire adversary killchain. In contrast to MITRE's access plugin, cald

Diederik Bakker 3 Aug 22, 2022
Djangoblog - A blogging site where people can make their accout and write blogs and read other author's blogs

This a blogging site where people can make their accout and write blogs and read other author's blogs.

1 Jan 26, 2022
A maubot plugin to invite users to Matrix rooms according to LDAP groups

LDAP Inviter Bot This is a maubot plugin that invites users to Matrix rooms according to their membership in LDAP groups.

David Mehren 14 Dec 09, 2022
Simple Python tool to check if there is an Office 365 instance linked to a domain.

o365chk.py Simple Python script to check if there is an Office365 instance linked to a particular domain.

Steven Harris 37 Jan 02, 2023
Job Guy Backend

جاب‌گای چیست؟ اونجا وضعیت چطوریه؟ یه سوال به همین کلیت و ابهام معمولا وقتی برای یه شرکت رزومه می‌فرستیم این سوال کلی و بزرگ برای همه پیش میاد.اونجا وض

Jobguy.work 217 Dec 25, 2022
Installer, package manager, build wrapper and version manager for Piccolo

Piccl Installer, package manager, build wrapper and version manager for Piccolo

1 Dec 19, 2021
Open Source defrag's mod code

Open Source defrag's mod code Goals: Code & License: Respect FOSS philosophy. Open source and community focus. Eliminate all traces of q3a-sdk licensi

sOkam! 1 Dec 10, 2022
Provide error messages for Python exceptions, even if the original message is empty

errortext is a Python package to provide error messages for Python exceptions, even if the original message is empty.

Thomas Aglassinger 0 Dec 07, 2021
Script para generar automatización de registro de formularios IEEH

Formularios_IEEH Script para generar automatización de registro de formularios IEEH Corresponde a un conjunto de script en python que permiten la auto

vhevia11 1 Jan 06, 2022
A repository containing an introduction to Panel made to be support videos and talks.

👍 Awesome Panel - Introduction to Panel THIS REPO IS WORK IN PROGRESS. PRE-ALPHA Panel is a very powerful framework for exploratory data analysis and

Marc Skov Madsen 51 Nov 17, 2022
奇遇淘客服务器端

奇遇淘客 APP 服务器端 警告 正在使用 v0.2.0 版本的用户,请尽快升级到 v0.2.1。 v0.2.0 版本的 Docker 镜像中包含了有问题的 aiohttp。 奇遇淘客代码库 奇遇淘客 iOS APP 奇遇淘客 Android APP 奇遇淘客文档 服务器端文档 Docker 使用

奇遇科技 92 Nov 09, 2022
FollowSpot is a comprehensive audition tracking fullstack web application for entertainment industry professionals.

FollowSpot is a comprehensive audition tracking fullstack web application for entertainment industry professionals. This app allows users to store information/media for all of their auditions while a

Jen Brissman 9 Jul 12, 2022
Built with Python programming language and QT library and Guess the number in three easy, medium and hard rolls

guess-the-numbers Built with Python programming language and QT library and Guess the number in three easy, medium and hard rolls Number guessing game

Amir Hussein Sharifnezhad 5 Oct 09, 2021