Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

Monitor the stability of a pandas or spark dataframe ⚙︎

Population Shift Monitoring popmon is a package that allows one to check the stability of a dataset. popmon works with both pandas and spark datasets.

ING Bank 403 Dec 07, 2022
Data imputations library to preprocess datasets with missing data

Impyute is a library of missing data imputation algorithms. This library was designed to be super lightweight, here's a sneak peak at what impyute can do.

Elton Law 329 Dec 05, 2022
Pandas and Dask test helper methods with beautiful error messages.

beavis Pandas and Dask test helper methods with beautiful error messages. test helpers These test helper methods are meant to be used in test suites.

Matthew Powers 18 Nov 28, 2022
Creating a statistical model to predict 10 year treasury yields

Predicting 10-Year Treasury Yields Intitially, I wanted to see if the volatility in the stock market, represented by the VIX index (data source), had

10 Oct 27, 2021
ICLR 2022 Paper submission trend analysis

Visualize ICLR 2022 OpenReview Data

Jintang Li 75 Dec 06, 2022
ETL pipeline on movie data using Python and postgreSQL

Movies-ETL ETL pipeline on movie data using Python and postgreSQL Overview This project consisted on a automated Extraction, Transformation and Load p

Juan Nicolas Serrano 0 Jul 07, 2021
Python Project on Pro Data Analysis Track

Udacity-BikeShare-Project: Python Project on Pro Data Analysis Track Basic Data Exploration with pandas on Bikeshare Data Basic Udacity project using

Belal Mohammed 0 Nov 10, 2021
Senator Trades Monitor

Senator Trades Monitor This monitor will grab the most recent trades by senators and send them as a webhook to discord. Installation To use the monito

Yousaf Cheema 5 Jun 11, 2022
Cold Brew: Distilling Graph Node Representations with Incomplete or Missing Neighborhoods

Cold Brew: Distilling Graph Node Representations with Incomplete or Missing Neighborhoods Introduction Graph Neural Networks (GNNs) have demonstrated

37 Dec 15, 2022
Fit models to your data in Python with Sherpa.

Table of Contents Sherpa License How To Install Sherpa Using Anaconda Using pip Building from source History Release History Sherpa Sherpa is a modeli

134 Jan 07, 2023
Modular analysis tools for neurophysiology data

Neuroanalysis Modular and interactive tools for analysis of neurophysiology data, with emphasis on patch-clamp electrophysiology. Functions for runnin

Allen Institute 5 Dec 22, 2021
A meta plugin for processing timelapse data timepoint by timepoint in napari

napari-time-slicer A meta plugin for processing timelapse data timepoint by timepoint. It enables a list of napari plugins to process 2D+t or 3D+t dat

Robert Haase 2 Oct 13, 2022
The Dash Enterprise App Gallery "Oil & Gas Wells" example

This app is based on the Dash Enterprise App Gallery "Oil & Gas Wells" example. For more information and more apps see: Dash App Gallery See the Dash

Austin Caudill 1 Nov 08, 2021
A Python module for clustering creators of social media content into networks

sm_content_clustering A Python module for clustering creators of social media content into networks. Currently supports identifying potential networks

72 Dec 30, 2022
This program analyzes a DNA sequence and outputs snippets of DNA that are likely to be protein-coding genes.

This program analyzes a DNA sequence and outputs snippets of DNA that are likely to be protein-coding genes.

1 Dec 28, 2021
Repository created with LinkedIn profile analysis project done

EN/en Repository created with LinkedIn profile analysis project done. The datase

Mayara Canaver 4 Aug 06, 2022
This module is used to create Convolutional AutoEncoders for Variational Data Assimilation

VarDACAE This module is used to create Convolutional AutoEncoders for Variational Data Assimilation. A user can define, create and train an AE for Dat

Julian Mack 23 Dec 16, 2022
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

Materials Discovery Group 61 Oct 02, 2022
Data cleaning tools for Business analysis

Datacleaning datacleaning tools for Business analysis This program is made for Vicky's work. You can use it, too. 数据清洗 该数据清洗工具是为了商业分析 这个程序是为了Vicky的工作而

Lin Jian 3 Nov 16, 2021
Functional tensors for probabilistic programming

Funsor Funsor is a tensor-like library for functions and distributions. See Functional tensors for probabilistic programming for a system description.

208 Dec 29, 2022