This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
哔咔漫画window客户端,界面使用PySide2,已实现分类、搜索、收藏夹、下载、在线观看、waifu2x等功能。

picacomic-windows 哔咔漫画window客户端,界面使用PySide2,已实现分类、搜索、收藏夹、下载、在线观看等功能。 功能介绍 登陆分流,还原安卓端的三个分流入口 分类,搜索,排行,收藏夹使用同一的逻辑,滚轮下滑自动加载下一页,双击打开 漫画详情,章节列表和评论列表 下载功能,目

1.8k Dec 31, 2022
kyle's vision of how datadog's python client should look

kyle's datadog python vision/proposal not for production use See examples/comprehensive.py for a mostly working example of the proposed API. 📈 🐶 ❤️

Kyle Verhoog 2 Nov 21, 2021
Plot toolbox based on Matplotlib, simple and elegant.

Elegant-Plot Plot toolbox based on Matplotlib, simple and elegant. 绘制效果 绘制过程 数据准备 每种图标类型的目录下有data.csv文件,依据样例数据填入自己的数据。

3 Jul 15, 2022
HM02: Visualizing Interesting Datasets

HM02: Visualizing Interesting Datasets This is a homework assignment for CSCI 40 class at Claremont McKenna College. Go to the project page to learn m

Qiaoling Chen 11 Oct 26, 2021
Lightspin AWS IAM Vulnerability Scanner

Red-Shadow Lightspin AWS IAM Vulnerability Scanner Description Scan your AWS IAM Configuration for shadow admins in AWS IAM based on misconfigured den

Lightspin 90 Dec 14, 2022
PanGraphViewer -- show panenome graph in an easy way

PanGraphViewer -- show panenome graph in an easy way Table of Contents Versions and dependences Desktop-based panGraphViewer Library installation for

16 Dec 17, 2022
Draw interactive NetworkX graphs with Altair

nx_altair Draw NetworkX graphs with Altair nx_altair offers a similar draw API to NetworkX but returns Altair Charts instead. If you'd like to contrib

Zachary Sailer 206 Dec 12, 2022
The Metabolomics Integrator (MINT) is a post-processing tool for liquid chromatography-mass spectrometry (LCMS) based metabolomics.

MINT (Metabolomics Integrator) The Metabolomics Integrator (MINT) is a post-processing tool for liquid chromatography-mass spectrometry (LCMS) based m

Sören Wacker 0 May 04, 2022
TensorDebugger (TDB) is a visual debugger for deep learning. It extends TensorFlow with breakpoints + real-time visualization of the data flowing through the computational graph

TensorDebugger (TDB) is a visual debugger for deep learning. It extends TensorFlow (Google's Deep Learning framework) with breakpoints + real-time visualization of the data flowing through the comput

Eric Jang 1.4k Dec 15, 2022
✅ Today I Learn

Today I Learn EDA numpy_100ex numpy_0~10 airline_satisfaction_prediction BERT_naver_movie_classification NLP_prepare NLP_Tweet_Emotion_Recognition tex

Yeonghoo_Ahn 3 Dec 15, 2022
Collection of scripts for making high quality beautiful math-related posters.

Poster Collection of scripts for making high quality beautiful math-related posters. The poster can have as large printing size as 3x2 square feet wit

Nattawut Phetmak 3 Jun 09, 2022
Area-weighted venn-diagrams for Python/matplotlib

Venn diagram plotting routines for Python/Matplotlib Routines for plotting area-weighted two- and three-circle venn diagrams. Installation The simples

Konstantin Tretyakov 400 Dec 31, 2022
Active Transport Analytics Model (ATAM) is a new strategic transport modelling and data visualization framework for Active Transport as well as emerging micro-mobility modes

{ATAM} Active Transport Analytics Model Active Transport Analytics Model (“ATAM”) is a new strategic transport modelling and data visualization framew

Peter Stephan 0 Jan 12, 2022
A dashboard built using Plotly-Dash for interactive visualization of Dex-connected individuals across the country.

Dashboard For The DexConnect Platform of Dexterity Global Working prototype submission for internship at Dexterity Global Group. Dashboard for real ti

Yashasvi Misra 2 Jun 15, 2021
Visualization of numerical optimization algorithms

Visualization of numerical optimization algorithms

Zhengxia Zou 46 Dec 01, 2022
Python package that generates hardware pinout diagrams as SVG images

PinOut A Python package that generates hardware pinout diagrams as SVG images. The package is designed to be quite flexible and works well for general

336 Dec 20, 2022
Jupyter Notebook extension leveraging pandas DataFrames by integrating DataTables and ChartJS.

Jupyter DataTables Jupyter Notebook extension to leverage pandas DataFrames by integrating DataTables JS. About Data scientists and in fact many devel

Marek Čermák 142 Dec 28, 2022
3D plotting and mesh analysis through a streamlined interface for the Visualization Toolkit (VTK)

PyVista Deployment Build Status Metrics Citation License Community 3D plotting and mesh analysis through a streamlined interface for the Visualization

PyVista 1.6k Jan 08, 2023
Visualizations of some specific solutions of different differential equations.

Diff_sims Visualizations of some specific solutions of different differential equations. Heat Equation in 1 Dimension (A very beautiful and elegant ex

2 Jan 13, 2022
Visualise Ansible execution time across playbooks, tasks, and hosts.

ansible-trace Visualise where time is spent in your Ansible playbooks: what tasks, and what hosts, so you can find where to optimise and decrease play

Mark Hansen 81 Dec 15, 2022