Run async workflows using pytest-fixtures-style dependency injection

Overview

asyncinject

PyPI Changelog License

Run async workflows using pytest-fixtures-style dependency injection

Installation

Install this library using pip:

$ pip install asyncinject

Usage

This library is inspired by pytest fixtures.

The idea is to simplify executing parallel asyncio operations by allowing them to be collected in a class, with the names of parameters to the class methods specifying which other methods should be executed first.

This then allows the library to create and execute a plan for executing various dependent methods in parallel.

Here's an example, using the httpx HTTP library.

from asyncinject import AsyncInjectAll
import httpx

async def get(url):
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).text

class FetchThings(AsyncInjectAll):
    async def example(self):
        return await get("http://www.example.com/")

    async def simonwillison(self):
        return await get("https://simonwillison.net/search/?tag=empty")

    async def both(self, example, simonwillison):
        return example + "\n\n" + simonwillison


combined = await FetchThings().both()
print(combined)

If you run this in ipython (which supports top-level await) you will see output that combines HTML from both of those pages.

The HTTP requests to www.example.com and simonwillison.net will be performed in parallel.

The library will notice that both() takes two arguments which are the names of other async def methods on that class, and will construct an execution plan that executes those two methods in parallel, then passes their results to the both() method.

Parameters are passed through

Your dependent methods can require keyword arguments which are passed to the original method.

class FetchWithParams(AsyncInjectAll):
    async def get_param_1(self, param1):
        return await get(param1)

    async def get_param_2(self, param2):
        return await get(param2)

    async def both(self, get_param_1, get_param_2):
        return get_param_1 + "\n\n" + get_param_2


combined = await FetchWithParams().both(
    param1 = "http://www.example.com/",
    param2 = "https://simonwillison.net/search/?tag=empty"
)
print(combined)

Parameters with default values are ignored

You can opt a parameter out of the dependency injection mechanism by assigning it a default value:

class IgnoreDefaultParameters(AsyncInjectAll):
    async def go(self, calc1, x=5):
        return calc1 + x

    async def calc1(self):
        return 5

print(await IgnoreDefaultParameters().go())
# Prints 10

AsyncInject and @inject

The above example illustrates the AsyncInjectAll class, which assumes that every async def method on the class should be treated as a dependency injection method.

You can also specify individual methods using the AsyncInject base class an the @inject decorator:

from asyncinject import AsyncInject, inject

class FetchThings(AsyncInject):
    @inject
    async def example(self):
        return await get("http://www.example.com/")

    @inject
    async def simonwillison(self):
        return await get("https://simonwillison.net/search/?tag=empty")

    @inject
    async def both(self, example, simonwillison):
        return example + "\n\n" + simonwillison

The resolve() function

If you want to execute a set of methods in parallel without defining a third method that lists them as parameters, you can do so using the resolve() function. This will execute the specified methods (in parallel, where possible) and return a dictionary of the results.

from asyncinject import resolve

fetcher = FetchThings()
results = await resolve(fetcher, ["example", "simonwillison"])

results will now be:

{
    "example": "contents of http://www.example.com/",
    "simonwillison": "contents of https://simonwillison.net/search/?tag=empty"
}

Development

To contribute to this library, first checkout the code. Then create a new virtual environment:

cd asyncinject
python -m venv venv
source venv/bin/activate

Or if you are using pipenv:

pipenv shell

Now install the dependencies and test dependencies:

pip install -e '.[test]'

To run the tests:

pytest
Comments
  • Concurrency is not being optimized

    Concurrency is not being optimized

    It looks like concurrency / parallelism is not being maximized due to the grouping of dependencies into node groups. Here's a simple example:

    import asyncio
    from time import time
    from typing import Annotated
    
    async def a():
        await asyncio.sleep(1)
    
    async def b():
        await asyncio.sleep(2)
    
    async def c(a):
        await asyncio.sleep(1)
    
    async def d(b, c):
        pass
    
    async def main_asyncinjector():
        reg = Registry(a, b, c, d)
        start = time()
        await reg.resolve(d)
        print(time()-start)
    
    asyncio.run(main_asyncinjector())
    

    This should take 2 seconds to run (start a and b, once a finishes start c, b and c finish at the same time and you're done) but takes 3 seconds (start a and b, wait for both to finish then start c).

    This happens because graphlib.TopologicalSorter is not used online and instead it is being used to statically compute groups of dependencies.

    I don't think it would be too hard to address this, but I'm not sure how much you'd want to change to accommodate this. I work on a similar project (https://github.com/adriangb/di) and there I found it very useful to break out the concept of an "executor" out of the container/registry concept, which means that instead of a parallel option you'd have pluggable executors that could choose to use concurrency, limit concurrency, use threads instead, etc. FWIW here's what that looks like with this example:

    import asyncio
    from time import time
    from typing import Annotated
    
    from asyncinject import Registry
    from di.dependant import Marker, Dependant
    from di.container import Container
    from di.executors import ConcurrentAsyncExecutor
    
    
    async def a():
        await asyncio.sleep(1)
    
    async def b():
        await asyncio.sleep(2)
    
    async def c(a: Annotated[None, Marker(a)]):
        await asyncio.sleep(1)
    
    async def d(b: Annotated[None, Marker(b)], c: Annotated[None, Marker(c)]):
        pass
    
    async def main_asyncinjector():
        reg = Registry(a, b, c, d)
        start = time()
        await reg.resolve(d)
        print(time()-start)
    
    
    async def main_di():
        container = Container()
        solved = container.solve(Dependant(d), scopes=[None])
        executor = ConcurrentAsyncExecutor()
        async with container.enter_scope(None) as state:
            start = time()
            await container.execute_async(solved, executor, state=state)
            print(time()-start)
    
    asyncio.run(main_asyncinjector())  # 3 seconds
    asyncio.run(main_di())  # 2 seconds
    
    enhancement 
    opened by adriangb 5
  • Investigate a non-class-based version

    Investigate a non-class-based version

    I'm thinking about using this with Datasette plugins, which aren't well suited to the current class-based mechanism because plugins may want to register their own additional dependency injection functions.

    research 
    opened by simonw 4
  • Debug mechanism

    Debug mechanism

    Add a mechanism which shows exactly how the class is executing, including which methods are running in parallel. Maybe even with a very basic ASCII visualization? Then use it to help illustrate the examples in the README, refs #4.

    enhancement 
    opened by simonw 4
  • A way to turn off parallel execution (for easier comparison)

    A way to turn off parallel execution (for easier comparison)

    Would be neat if you could toggle the parallel execution on and off, to better demonstrate the performance difference that it implements.

    Would happen in this code that calls gather(): https://github.com/simonw/asyncinject/blob/47348978242880bd72a444158bbecc64566b0c55/asyncinject/init.py#L114-L123

    enhancement 
    opened by simonw 2
  • Ability to resolve an unregistered function

    Ability to resolve an unregistered function

    I'd like to be able to do the following:

    async def one():
        return 1
    
    async def two():
        return 2
    
    registry = Registry(one, two)
    
    async def three(one, two):
        return one + two
    
    result = await registry.resolve(three)
    

    Note that three has not been registered with the registry - but it still has its parameters inspected and used to resolve the dependencies.

    This would be useful for Datasette, where I want plugins to be able to interact with predefined registries without needing to worry about picking a name for their function that doesn't clash with a name that has been registered by another plugin.

    enhancement 
    opened by simonw 1
  • Try using __init_subclass__

    Try using __init_subclass__

    https://twitter.com/dabeaz/status/1466731368956809219 - David Beazley says:

    I think 95% of the problems once solved by a metaclass can be solved by __init_subclass__ instead

    research 
    opened by simonw 1
  • Documentation needs a smarter example that illustrates graph dependencies

    Documentation needs a smarter example that illustrates graph dependencies

    The examples in the README are boring, and don't show how the library can resolve a dependency tree into the most efficient possible mechanism.

    Need to come up with a realistic example that demonstrates that.

    documentation 
    opened by simonw 0
Releases(0.5)
  • 0.5(Apr 22, 2022)

    • registry.resolve() can now be used to resolve functions that have not been registered. #13

      async def one():
          return 1
      
      async def two():
          return 2
      
      registry = Registry(one, two)
      
      async def three(one, two):
          return one + two
      
      result = await registry.resolve(three)
      # result is now 3
      
    Source code(tar.gz)
    Source code(zip)
  • 0.4(Apr 18, 2022)

  • 0.3(Apr 16, 2022)

    Extensive, backwards-compatibility breaking redesign.

    • This library no longer uses subclasses. Instead, a Registry() object is created and async def functions are registered with that registry. The registry.resolve(fn) method is then used to execute functions with their dependencies. #8
    • Registry(timer=callable) can now be used to register a function to record the times taken to execute each function. This callable will be passed three arguments - the function name, the start time and the end time. #7
    • The parallel=True argument to the Registry() constructor can be switched to False to disable parallel execution - useful for running benchmarks to understand the performance benefit of running functions in parallel. #6
    Source code(tar.gz)
    Source code(zip)
  • 0.2(Dec 21, 2021)

  • 0.2a1(Dec 3, 2021)

  • 0.2a0(Nov 17, 2021)

    • Provided parameters are now forwarded on to dependent methods.
    • Parameters with default values specified in the method signature are no longer treated as dependency injection parameters. #1
    Source code(tar.gz)
    Source code(zip)
  • 0.1a0(Nov 17, 2021)

Owner
Simon Willison
Simon Willison
Tool to produce system call tables from Linux source code.

Syscalls Tool to generate system call tables from the linux source tree. Example The following will produce a markdown (.md) file containing the table

7 Jul 30, 2022
Python utilities for writing cross-version compatible libraries

Python utilities for writing cross-version compatible libraries

Tyler M. Kontra 85 Jun 29, 2022
Python bytecode manipulation and import process customization to do evil stuff with format strings. Nasty!

formathack Python bytecode manipulation and import process customization to do evil stuff with format strings. Nasty! This is an answer to a StackOver

Michiel Van den Berghe 5 Jan 18, 2022
A utility tool to create .env files

A utility tool to create .env files dump-env takes an .env.template file and some optional environmental variables to create a new .env file from thes

wemake.services 89 Dec 08, 2022
aws ec2.py companion script to generate sshconfigs with auto bastion host discovery

ec2-bastion-sshconfig This script will interate over instances found by ec2.py and if those instances are not publically accessible it will search the

Steve Melo 1 Sep 11, 2022
Airspy-Utils is a small software collection to help with firmware related operations on Airspy HF+ devices.

Airspy-Utils Airspy-Utils is a small software collection to help with firmware related operations on Airspy HF+ devices on Linux (and other free syste

Dhiru Kholia 11 Oct 04, 2022
Simple integer-valued time series bit packing

Smahat allows to encode a sequence of integer values using a fixed (for all values) number of bits but minimal with regards to the data range. For example: for a series of boolean values only one bit

Ghiles Meddour 7 Aug 27, 2021
pydsinternals - A Python native library containing necessary classes, functions and structures to interact with Windows Active Directory.

pydsinternals - Directory Services Internals Library A Python native library containing necessary classes, functions and structures to interact with W

Podalirius 36 Dec 14, 2022
This program organizes automatically files in folders named as file's extension

Auto Sorting System by Sergiy Grimoldi - V.0.0.2 This program organizes automatically files in folders named as file's extension How to use the code T

Sergiy Grimoldi 1 Jan 07, 2022
This repository contains some utilities for playing with PKINIT and certificates.

PKINIT tools This repository contains some utilities for playing with PKINIT and certificates. The tools are built on minikerberos and impacket. Accom

Dirk-jan 395 Dec 27, 2022
MongoDB utility to inflate the contents of small collection to a new larger collection

MongoDB Data Inflater ("data-inflater") The data-inflater tool is a MongoDB utility to automate the creation of a new large database collection using

Paul Done 3 Nov 28, 2021
a demo show how to dump lldb info to ida.

用一个demo来聊聊动态trace 这个仓库能做什么? 帮助理解动态trace的思想。仓库内的demo,可操作,可实践。 动态trace核心思想: 动态记录一个函数内每一条指令的执行中产生的信息,并导入IDA,用来弥补IDA等静态分析工具的不足。 反编译看一下 先clone仓库,把hellolldb

25 Nov 28, 2022
general-phylomoji: a phylogenetic tree of emoji

general-phylomoji: a phylogenetic tree of emoji

2 Dec 11, 2021
Generates a random prnt.sc link and display image.

Generates a random prnt.sc link and display image.

Emirhan 3 Oct 08, 2021
A utility that makes it easy to work with Python projects containing lots of packages, of which you only want to develop some.

Mixed development source packages on top of stable constraints using pip mxdev [mɪks dɛv] is a utility that makes it easy to work with Python projects

BlueDynamics Alliance 6 Jun 08, 2022
A simple example for calling C++ functions in Python by `ctypes`.

ctypes-example A simple example for calling C++ functions in Python by ctypes. Features call C++ function int bar(int* value, char* msg) with argumene

Yusu Pan 3 Nov 23, 2022
Compute the fair market value (FMV) of staking rewards at time of receipt.

tendermint-tax A tool to help calculate the tax liability of staking rewards on Tendermint chains. Specifically, this tool calculates the fair market

5 Jan 07, 2022
We provide useful util functions. When adding a util function, please add a description of the util function.

Utils Collection Motivation When we implement codes, we often search for util functions that are already implemented. Here, we are going to share util

6 Sep 09, 2021
A hashtag from string extract python module

A hashtag from string extract python module

Fayas Noushad 3 Aug 10, 2022
convert a dict-list object from / to a typed object(class instance with type annotation)

objtyping 带类型定义的对象转换器 由来 Python不是强类型语言,开发人员没有给数据定义类型的习惯。这样虽然灵活,但处理复杂业务逻辑的时候却不够方便——缺乏类型检查可能导致很难发现错误,在IDE里编码时也没

Song Hui 15 Dec 22, 2022