Demonstrate a Dataflow pipeline that saves data from an API into BigQuery table

Overview

Overview

dataflow-mvp provides a basic example pipeline that pulls data from an API and writes it to a BigQuery table using GCP's Dataflow (i.e., Apache Beam)

Table of Contents

File Description
main.py Main Python code for the Dataflow pipeline. The function defineBQSchema defines the BQ table schema
setup.py When the pipeline is deployed in GCP as a template, GCP uses setup.py to set up the worker nodes (e.g., install required Python dependencies).
build.bat Bash script to deploy the pipeline as a reusable template in GCP.

Environment

  • Local machine running Microsoft Windows 10 Home
  • Python 3.6.8
    • As of 12/1/21, Apache Beam only supports 3.6, 3.7, and 3.8 (not 3.9). However, orjson only supports 3.6.

Getting Started

Pre-Requisites

The following instructions assume that the project ID is dataflow-mvp and you have owner access to it.

  1. If you don't have it already, install the Google Cloud SDK:
    https://cloud.google.com/sdk/docs/install

  2. Authenticate your Google account:
    gcloud auth login

  3. Create a virtual environment for Python:
    py -3.8 venv venv

  4. Activate the virtual environment, upgrade pip, and install the Apache Beam library for GCP:

"./venv/Scripts/activate.bat"
python -m pip install --upgrade pip
python -m pip install apache_beam[gcp]

Run Build

  1. To make our lives easier later, set environment variables for the following:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)

For instance, to set the PROJECT_ID variable in the Windows CLI, use:
set PROJECT_ID=dataflow-mvp

On Linux machines, use
export PROJECT_ID=dataflow-mvp

The instructions below assume you're working on a Windows machine. Therefore, if you're working in a Linux environment, you'll have to use $PROJECT_ID instead of %PROJECT_ID% where appropriate in the instructions below.

  1. Set the GCP project via config:
    gcloud config set project %PROJECT_ID%
  • You can verify the project is correctly set using:
    gcloud config list
  1. Enable the necessary APIs:
gcloud services enable dataflow.googleapis.com && ^
gcloud services enable cloudscheduler.googleapis.com && ^
gcloud services enable bigquery.googleapis.com && ^
gcloud services enable cloudresourcemanager.googleapis.com  && ^
gcloud services enable appengine.googleapis.com
  1. Create a service account for the Dataflow runner:
gcloud iam service-accounts create dataflow-runner --display-name "Dataflow Runner service account"
  1. Add the required IAM roles to the Dataflow runner's service account:
gcloud projects add-iam-policy-binding %PROJECT_ID% --member serviceAccount:dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com --role roles/owner
  1. Create a GCS bucket to store Dataflow code, staging files and templates:
gsutil mb -p %PROJECT_ID% -l %GCP_REGION% gs://%DATAFLOW_BUCKET%

Build the Dataflow Template

  1. In build.bat, edit the variables in lines 1 through 4:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)
  1. Run the build.bat script:
build.bat

This will create the template for the Dataflow job in a the specified GCS bucket.

  1. Verify that the template has been uploaded to the GCS bucket:
    gsutil ls gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%

Create the Cloud Scheduler Job

  1. Finally, submit a Cloud Scheduler job to run Dataflow on a desired schedule:
gcloud scheduler jobs create http api-to-gbq-scheduler ^
--schedule="0 */3 * * *" ^
--uri="https://dataflow.googleapis.com/v1b3/projects/%PROJECT_ID%/locations/%GCP_REGION%/templates:launch?gcsPath=gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%" ^
--http-method="post" ^
--oauth-service-account-email="dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com" ^
--oauth-token-scope="https://www.googleapis.com/auth/cloud-platform" ^
--message-body="{""jobName"": ""api-to-bq-df"", ""parameters"": {""region"": ""%GCP_REGION%""}, ""environment"": {""numWorkers"": ""3""}}" ^
--time-zone=America/Chicago 

Notes:

  • Alternatively, you could use the message-body-from-file argument. However, you'll need to manually specify the GCP region since we can't use environment variables within the JSON.
  • The cron string 0 */3 * * * executes the job every 3 hours.
  • The jobName parameter, api-to-bq-df, names the job as it will be listed in the Cloud Scheduler app.

Resources

Warranty

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Owner
Chris Carbonell
Chris Carbonell
Additional tools for particle accelerator data analysis and machine information

PyLHC Tools This package is a collection of useful scripts and tools for the Optics Measurements and Corrections group (OMC) at CERN. Documentation Au

PyLHC 3 Apr 13, 2022
PipeChain is a utility library for creating functional pipelines.

PipeChain Motivation PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Austra

Michael Milton 2 Aug 07, 2022
Synthetic Data Generation for tabular, relational and time series data.

An Open Source Project from the Data to AI Lab, at MIT Website: https://sdv.dev Documentation: https://sdv.dev/SDV User Guides Developer Guides Github

The Synthetic Data Vault Project 1.2k Jan 07, 2023
Geospatial data-science analysis on reasons behind delay in Grab ride-share services

Grab x Pulis Detailed analysis done to investigate possible reasons for delay in Grab services for NUS Data Analytics Competition 2022, to be found in

Keng Hwee 6 Jun 07, 2022
This repo contains a simple but effective tool made using python which can be used for quality control in statistical approach.

📈 Statistical Quality Control 📉 This repo contains a simple but effective tool made using python which can be used for quality control in statistica

SasiVatsal 8 Oct 18, 2022
Dbt-core - dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

Dbt-core - dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

dbt Labs 6.3k Jan 08, 2023
Repository created with LinkedIn profile analysis project done

EN/en Repository created with LinkedIn profile analysis project done. The datase

Mayara Canaver 4 Aug 06, 2022
An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Fastlane An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines. Project structure fastlane |- fastlane: (ETL fr

Dan Katz 2 Jan 06, 2022
Find exposed data in Azure with this public blob scanner

BlobHunter A tool for scanning Azure blob storage accounts for publicly opened blobs. BlobHunter is a part of "Hunting Azure Blobs Exposes Millions of

CyberArk 250 Jan 03, 2023
Full automated data pipeline using docker images

Create postgres tables from CSV files This first section is only relate to creating tables from CSV files using postgres container alone. Just one of

1 Nov 21, 2021
Functional tensors for probabilistic programming

Funsor Funsor is a tensor-like library for functions and distributions. See Functional tensors for probabilistic programming for a system description.

208 Dec 29, 2022
Fit models to your data in Python with Sherpa.

Table of Contents Sherpa License How To Install Sherpa Using Anaconda Using pip Building from source History Release History Sherpa Sherpa is a modeli

134 Jan 07, 2023
CS50 pset9: Using flask API to create a web application to exchange stocks' shares.

C$50 Finance In this guide we want to implement a website via which users can “register”, “login” “buy” and “sell” stocks, like below: Background If y

1 Jan 24, 2022
PATC: Introduction to Big Data Analytics. Practical Data Analytics for Solving Real World Problems

PATC: Introduction to Big Data Analytics. Practical Data Analytics for Solving Real World Problems

1 Feb 07, 2022
Option Pricing Calculator using the Binomial Pricing Method (No Libraries Required)

Binomial Option Pricing Calculator Option Pricing Calculator using the Binomial Pricing Method (No Libraries Required) Background A derivative is a fi

sammuhrai 1 Nov 29, 2021
Analyze the Gravitational wave data stored at LIGO/VIRGO observatories

Gravitational-Wave-Analysis This project showcases how to analyze the Gravitational wave data stored at LIGO/VIRGO observatories, using Python program

1 Jan 23, 2022
A columnar data container that can be compressed.

Unmaintained Package Notice Unfortunately, and due to lack of resources, the Blosc Development Team is unable to maintain this package anymore. During

944 Dec 09, 2022
Used for data processing in machine learning, and help us to construct ML model more easily from scratch

Used for data processing in machine learning, and help us to construct ML model more easily from scratch. Can be used in linear model, logistic regression model, and decision tree.

ShawnWang 0 Jul 05, 2022
Aggregating gridded data (xarray) to polygons

A package to aggregate gridded data in xarray to polygons in geopandas using area-weighting from the relative area overlaps between pixels and polygons. Check out the binder link above for a sample c

Kevin Schwarzwald 42 Nov 09, 2022
Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks

The following Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks (MOFs). The training set is extracted from the Cambridge S

1 Jan 09, 2022