A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Overview

Introduction

A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Requirements

  • Python >=3.7.3
  • Pika ==1.2.0
  • Aio-pika ==6.8.0
  • Requests >=2.25.1

Installation

pip install rctiplus-rabbitmq-python-sdk

Getting latest version

pip install rctiplus-rabbitmq-python-sdk --upgrade

Usage

To start using this SDK, you may follow given instructions bellow in order.

Payload handler

First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname'])">
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload

class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

MessagePayload class from the SDK's core has this functions that require to implemented:

'MessagePayload': """Generate data from specified string payload message format Raises: NotImplementedError: Raise an error if not implemented """ raise NotImplementedError() def __str__(self) -> str: """Convert specified data format to string payload message Raises: NotImplementedError: Raise an error if not implemented Returns: str: String payload message """ raise NotImplementedError()">
class MessagePayload:
    """Python RabbitMQ message payload
    """
    
    @classmethod
    def from_str(cls, message: str) -> 'MessagePayload':
        """Generate data from specified string payload message format

        Raises:
            NotImplementedError: Raise an error if not implemented
        """
        raise NotImplementedError()

    def __str__(self) -> str:
        """Convert specified data format to string payload message

        Raises:
            NotImplementedError: Raise an error if not implemented

        Returns:
            str: String payload message
        """
        raise NotImplementedError()

Connect to RabbitMQ

Making connection to RabbitMQ server can be done by doing this simple way:

from rctiplus_rabbitmq_python_sdk import RabbitMQ

conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

Sending message

After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Receiving message

Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

conn.receive('test', callback)

For callback function, according to Pikas standart library, you need to pass 4 arguments ch, method, properties and body to catch all needed values from incomming message.

Putting it all together

Here is the complete example from the code above:

Complete example of sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) conn.send('test', payload)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Complete example of consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message def callback(ch, method, properties, body): print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel conn.receive('test', callback)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })

    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    
    # Generate data from string payload message
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

# Receive & listen messages from queue channel
conn.receive('test', callback)

Asynchronous

This SDK also support asynchronous process. To use this feature, use AIORabbitMQ instead of RabbitMQ. All methods provided in AIORabbitMQ are treated as async function. So, when you calling the methods, you need to await them.

Async connect to RabbitMQ

from rctiplus_rabbitmq_python_sdk import AIORabbitMQ

conn = AIORabbitMQ(loop)
await conn.connect(host='localhost', port=5672, username='guest', password='guest')

loop is an asynchronous event loop, example: asyncio.get_event_loop()

Async sending message

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
await conn.send('test', payload)

Async receiving message

async def callback(message):
    body = message.body
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

await conn.receive('test', callback)

In asynchronous process, you just need pass 1 argument on callback function. This argument is a representation of aio_pika.IncomingMessage to catch all needed values from incomming message.

Complete example of asynchronous process

Here is the complete example of asynchronous process above:

Complete example of asynchronous sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') async with conn.connection: # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) await conn.send('test', payload) # Event loop loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    async with conn.connection:
        # Send payload to queue
        payload = JSONPayload('John', 'Doe')
        print('payload:', payload)
        await conn.send('test', payload)


# Event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Complete example of asynchronous consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message async def callback(message): body = message.body print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel await conn.receive('test', callback) return conn # Event loop loop = asyncio.get_event_loop() connection = loop.run_until_complete(main(loop)) try: loop.run_forever() finally: loop.run_until_complete(connection.disconnect())">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    # Create a callback to be executed immadiately after recieved a message
    async def callback(message):
        body = message.body
        print("[x] Received %r" % body)
        
        # Generate data from string payload message
        data = JSONPayload.from_str(body)
        print(f'data: firstname={data.firstname}, lastname={data.lastname}')

    # Receive & listen messages from queue channel
    await conn.receive('test', callback)

    return conn


# Event loop
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(connection.disconnect())

License

GNU General Public License v3

Owner
Dali Kewara
An unexpected journey and gonna make it simple but Spectacular!
Dali Kewara
This tool lets you perform some quick tasks for CTFs and Pentesting.

This tool lets you convert strings and numbers between number bases (2, 8, 10 and 16) as well as ASCII text. You can use the IP address analyzer to find out details on IPv4 and perform abbreviation a

Ayomide Ayodele-Soyebo 1 Jul 16, 2022
A simulator for xkcd 2529's weirdly concrete problem

What is this? This is a quick hack implementation of a simulator for xkcd 2529's weirdly concrete problem. This is barely tested and I suck at computa

Reuben Steenekamp 6 Oct 27, 2021
This code renames subtitle file names to your video files names, so you don't need to rename them manually.

Rename Subtitle This code renames your subtitle file names to your video file names so you don't need to do it manually Note: It only works for series

Mostafa Kazemi 4 Sep 12, 2021
✨ Un juste prix totalement fait en Python par moi, et en français.

Juste Prix ❗ Un juste prix totalement fait en Python par moi, et en français. 🔮 Avec l'utilisation du module "random", j'ai pu faire un choix aléatoi

MrGabin 3 Jun 06, 2021
MicroMIUI - Script to optimize miui and not only

MicroMIUI - Script to optimize miui and not only

Groiznyi-Studio 1 Nov 02, 2021
Homebase Name Changer for Fortnite: Save the World.

Homebase Name Changer This program allows you to change the Homebase name in Fortnite: Save the World. How to use it? After starting the HomebaseNameC

PRO100KatYT 7 May 21, 2022
Python lightweight dependency injection library

pythondi pythondi is a lightweight dependency injection library for python Support both sync and async functions Installation pip3 install pythondi Us

Hide 41 Dec 16, 2022
Python Random Number Genrator

This Genrates Random Numbers. This Random Number Generator was made using python. This software uses Time and Random extension. Download the EXE file and run it to get your answer.

Krish Sethi 2 Feb 03, 2022
Export watched content from Tautulli to the Letterboxd CSV Import Format

Export watched content from Tautulli to the Letterboxd CSV Import Format

Evan J 5 Aug 31, 2022
Rabbito is a mini tool to find serialized objects in input values

Rabbito-ObjectFinder Rabbito is a mini tool to find serialized objects in input values What does Rabbito do Rabbito has the main object finding Serial

7 Dec 13, 2021
Automatically Generate Rulesets for IIS for Intelligent HTTP/S C2 Redirection

Automatically Generate Rulesets for IIS for Intelligent HTTP/S C2 Redirection This project converts a Cobalt Strike profile to a functional web.config

Jesse 99 Dec 13, 2022
Tool for generating Memory.scan() compatible instruction search patterns

scanpat Tool for generating Frida Memory.scan() compatible instruction search patterns. Powered by r2. Examples $ ./scanpat.py arm.ks:64 'sub sp, sp,

Ole André Vadla Ravnås 13 Sep 19, 2022
Daiho Tool is a Script Gathering for Windows/Linux systems written in Python.

Daiho is a Script Developed with Python3. It gathers a total of 22 Discord tools (including a RAT, a Raid Tool, a Nuker Tool, a Token Grabberr, etc). It has a pleasant and intuitive interface to faci

AstraaDev 32 Jan 05, 2023
MongoDB utility to inflate the contents of small collection to a new larger collection

MongoDB Data Inflater ("data-inflater") The data-inflater tool is a MongoDB utility to automate the creation of a new large database collection using

Paul Done 3 Nov 28, 2021
✨ Une calculatrice totalement faite en Python par moi, et en français.

Calculatrice ❗ Une calculatrice totalement faite en Python par moi, et en français. 🔮 Voici une calculatrice qui vous permet de faire vos additions,

MrGabin 3 Jun 06, 2021
Entropy-controlled contexts in Python

Python module ordered ordered module is the opposite to random - it maintains order in the program. import random x = 5 def increase(): global x

HyperC 36 Nov 03, 2022
Handy Tool to check the availability of onion site and to extract the title of submitted onion links.

This tool helps is to quickly investigate a huge set of onion sites based by checking its availability which helps to filter out the inactive sites and collect the site title that might helps us to c

Balaji 13 Nov 25, 2022
Know your customer pipeline in apache air flow

KYC_pipline Know your customer pipeline in apache air flow For a successful pipeline run take these steps: Run you Airflow server Admin - connection

saeed 4 Aug 01, 2022
convert a dict-list object from / to a typed object(class instance with type annotation)

objtyping 带类型定义的对象转换器 由来 Python不是强类型语言,开发人员没有给数据定义类型的习惯。这样虽然灵活,但处理复杂业务逻辑的时候却不够方便——缺乏类型检查可能导致很难发现错误,在IDE里编码时也没

Song Hui 15 Dec 22, 2022
Build capture utility for Linux

CX-BUILD Compilation Database alternative Build Prerequisite the CXBUILD uses linux system call trace utility called strace which was customized. So I

GLaDOS (G? L? Automatic Debug Operation System) 3 Nov 03, 2022