A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == ⚡ 💪 ❤️

Overview

pubsub

🗞️ FastAPI Websocket Pub/Sub

Tests Package Downloads

A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

Installation 🛠️

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == 💪 ❤️

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on server
      async def on_event(data):
          print("We got an event! with data- ", data)
      # Subscribe for the event 
      client.subscribe("my event", on_event)
  • Publish

    • Directly from server code to connected clients.
      app = FastAPI() 
      endpoint = PubSubEndpoint()
      endpoint.register_route(app, "/pubsub")
      endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • From client to client (through the servers)
      async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
          endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app = FastAPI() 
      endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
      
      @app.websocket("/pubsub")
      async def websocket_rpc_endpoint(websocket: WebSocket):
          async with endpoint.broadcaster:
              await endpoint.main_loop(websocket)
      see examples/pubsub_broadcaster_server_example.py for full usage example

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter

from fastapi_websocket_pubsub import PubSubEndpoint
app =  FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
    # Upon request trigger an event
    endpoint.publish(["triggered"])

Client:

from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
    print("Trigger URL was accessed")

async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
    # Subscribe for the event 
    client.subscribe("triggered", on_event)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based on fastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based on broadcaster for syncing server instances

  • Server Endpoint:

    • Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based on Tenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on python websockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config. It provides a helper logging module to control how it produces logs for you. See fastapi_websocket_rpc/logger.py. Use logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer. Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features
You might also like...
Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

🚀   Cookiecutter Template for FastAPI + React Projects.  Using PostgreSQL, SQLAlchemy, and Docker
🚀 Cookiecutter Template for FastAPI + React Projects. Using PostgreSQL, SQLAlchemy, and Docker

FastAPI + React · A cookiecutter template for bootstrapping a FastAPI and React project using a modern stack. Features FastAPI (Python 3.8) JWT authen

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

FastAPI application and service structure for a more maintainable codebase

Abstracting FastAPI Services See this article for more information: https://camillovisini.com/article/abstracting-fastapi-services/ Poetry poetry inst

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.
Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.

Full Stack FastAPI and PostgreSQL - Base Project Generator Generate a backend and frontend stack using Python, including interactive API documentation

Releases(v0.1.20)
Dead-simple mailer micro-service for static websites

Mailer Dead-simple mailer micro-service for static websites A free and open-source software alternative to contact form services such as FormSpree, to

Romain Clement 42 Dec 21, 2022
This repository contains learning resources for Python Fast API Framework and Docker

This repository contains learning resources for Python Fast API Framework and Docker, Build High Performing Apps With Python BootCamp by Lux Academy and Data Science East Africa.

Harun Mbaabu Mwenda 23 Nov 20, 2022
FastAPI Socket.io with first-class documentation using AsyncAPI

fastapi-sio Socket.io FastAPI integration library with first-class documentation using AsyncAPI The usage of the library is very familiar to the exper

Marián Hlaváč 9 Jan 02, 2023
FastAPI + Postgres + Docker Compose + Heroku Deploy Template

FastAPI + Postgres + Docker Compose + Heroku Deploy ⚠️ For educational purpose only. Not ready for production use YET Features FastAPI with Postgres s

DP 12 Dec 27, 2022
Ansible Inventory Plugin, created to get hosts from HTTP API.

ansible-ws-inventory-plugin Ansible Inventory Plugin, created to get hosts from HTTP API. Features: Database compatible with MongoDB and Filesystem (J

Carlos Neto 0 Feb 05, 2022
Get MODBUS data from Sofar (K-TLX) inverter through LSW-3 or LSE module

SOFAR Inverter + LSW-3/LSE Small utility to read data from SOFAR K-TLX inverters through the Solarman (LSW-3/LSE) datalogger. Two scripts to get inver

58 Dec 29, 2022
Simple web app example serving a PyTorch model using streamlit and FastAPI

streamlit-fastapi-model-serving Simple example of usage of streamlit and FastAPI for ML model serving described on this blogpost and PyConES 2020 vide

Davide Fiocco 291 Jan 06, 2023
implementation of deta base for FastAPIUsers

FastAPI Users - Database adapter for Deta Base Ready-to-use and customizable users management for FastAPI Documentation: https://fastapi-users.github.

2 Aug 15, 2022
Repository for the Demo of using DVC with PyCaret & MLOps (DVC Office Hours - 20th Jan, 2022)

Using DVC with PyCaret & FastAPI (Demo) This repo contains all the resources for my demo explaining how to use DVC along with other interesting tools

Tezan Sahu 6 Jul 22, 2022
MQTT FastAPI Wrapper With Python

mqtt-fastapi-wrapper Quick start Create mosquitto.conf with the following content: ➜ /tmp cat mosquitto.conf persistence false allow_anonymous true

Vitalii Kulanov 3 May 09, 2022
Stac-fastapi built on Tile38 and Redis to support caching

stac-fastapi-caching Stac-fastapi built on Tile38 to support caching. This code is built on top of stac-fastapi-elasticsearch 0.1.0 with pyle38, a Pyt

Jonathan Healy 4 Apr 11, 2022
Drop-in MessagePack support for ASGI applications and frameworks

msgpack-asgi msgpack-asgi allows you to add automatic MessagePack content negotiation to ASGI applications (Starlette, FastAPI, Quart, etc.), with a s

Florimond Manca 128 Jan 02, 2023
FastAPI IPyKernel Sandbox

FastAPI IPyKernel Sandbox This repository is a light-weight FastAPI project that is meant to provide a wrapper around IPyKernel interactions. It is in

Nick Wold 2 Oct 25, 2021
Sample project showing reliable data ingestion application using FastAPI and dramatiq

Create and deploy a reliable data ingestion service with FastAPI, SQLModel and Dramatiq This is the source code for the data ingestion service explain

François Voron 31 Nov 30, 2022
FastAPI native extension, easy and simple JWT auth

fastapi-jwt FastAPI native extension, easy and simple JWT auth

Konstantin Chernyshev 19 Dec 12, 2022
京东图片点击验证码识别

京东图片验证码识别 本项目是@yqchilde 大佬的 JDMemberCloseAccount 识别图形验证码(#45)思路验证,若你也有思路可以提交Issue和PR也可以在 @yqchilde 的 TG群 找到我 声明 本脚本只是为了学习研究使用 本脚本除了采集处理验证码图片没有其他任何功能,也

AntonVanke 37 Dec 22, 2022
Toolkit for developing and maintaining ML models

modelkit Python framework for production ML systems. modelkit is a minimalist yet powerful MLOps library for Python, built for people who want to depl

140 Dec 27, 2022
Town / City geolocations with FastAPI & Mongo

geolocations-api United Kingdom Town / City geolocations with FastAPI & Mongo Build container To build a custom image or extend the api run the follow

Joe Gasewicz 3 Jan 26, 2022
Dead simple CSRF security middleware for Starlette ⭐ and Fast API ⚡

csrf-starlette-fastapi Dead simple CSRF security middleware for Starlette ⭐ and Fast API ⚡ Will work with either a input type="hidden" field or ajax

Nathaniel Sabanski 9 Nov 20, 2022
A dynamic FastAPI router that automatically creates CRUD routes for your models

⚡ Create CRUD routes with lighting speed ⚡ A dynamic FastAPI router that automatically creates CRUD routes for your models Documentation: https://fast

Adam Watkins 943 Jan 01, 2023