Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

An orchestration platform for the development, production, and observation of data assets.

Dagster An orchestration platform for the development, production, and observation of data assets. Dagster lets you define jobs in terms of the data f

Dagster 6.2k Jan 08, 2023
Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video.

Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video. You can chose the cha

2 Jul 22, 2022
Pandas-based utility to calculate weighted means, medians, distributions, standard deviations, and more.

weightedcalcs weightedcalcs is a pandas-based Python library for calculating weighted means, medians, standard deviations, and more. Features Plays we

Jeremy Singer-Vine 98 Dec 31, 2022
pandas: powerful Python data analysis toolkit

pandas is a Python package that provides fast, flexible, and expressive data structures designed to make working with "relational" or "labeled" data both easy and intuitive.

pandas 36.4k Jan 03, 2023
The Master's in Data Science Program run by the Faculty of Mathematics and Information Science

The Master's in Data Science Program run by the Faculty of Mathematics and Information Science is among the first European programs in Data Science and is fully focused on data engineering and data a

Amir Ali 2 Jun 17, 2022
Programmatically access the physical and chemical properties of elements in modern periodic table.

API to fetch elements of the periodic table in JSON format. Uses Pandas for dumping .csv data to .json and Flask for API Integration. Deployed on "pyt

the techno hack 3 Oct 23, 2022
A set of procedures that can realize covid19 virus detection based on blood.

A set of procedures that can realize covid19 virus detection based on blood.

Nuyoah-xlh 3 Mar 07, 2022
MotorcycleParts DataAnalysis python

We work with the accounting department of a company that sells motorcycle parts. The company operates three warehouses in a large metropolitan area.

NASEEM A P 1 Jan 12, 2022
A probabilistic programming library for Bayesian deep learning, generative models, based on Tensorflow

ZhuSuan is a Python probabilistic programming library for Bayesian deep learning, which conjoins the complimentary advantages of Bayesian methods and

Tsinghua Machine Learning Group 2.2k Dec 28, 2022
Semi-Automated Data Processing

Perform semi automated exploratory data analysis, feature engineering and feature selection on provided dataset by visualizing every possibilities on each step and assisting the user to make a meanin

Arun Singh Babal 1 Jan 17, 2022
This is a repo documenting the best practices in PySpark.

Spark-Syntax This is a public repo documenting all of the "best practices" of writing PySpark code from what I have learnt from working with PySpark f

Eric Xiao 447 Dec 25, 2022
An Integrated Experimental Platform for time series data anomaly detection.

Curve Sorry to tell contributors and users. We decided to archive the project temporarily due to the employee work plan of collaborators. There are no

Baidu 486 Dec 21, 2022
A library to create multi-page Streamlit applications with ease.

A library to create multi-page Streamlit applications with ease.

Jackson Storm 107 Jan 04, 2023
Tokyo 2020 Paralympics, Analytics

Tokyo 2020 Paralympics, Analytics Thanks for checking out my app! It was built entirely using matplotlib and Tokyo 2020 Paralympics data. This applica

Petro Ivaniuk 1 Nov 18, 2021
Probabilistic reasoning and statistical analysis in TensorFlow

TensorFlow Probability TensorFlow Probability is a library for probabilistic reasoning and statistical analysis in TensorFlow. As part of the TensorFl

3.8k Jan 05, 2023
Picka: A Python module for data generation and randomization.

Picka: A Python module for data generation and randomization. Author: Anthony Long Version: 1.0.1 - Fixed the broken image stuff. Whoops What is Picka

Anthony 108 Nov 30, 2021
Tkinter Izhikevich Neuron Model With Python

TKINTER IZHIKEVICH NEURON MODEL WITH PYTHON Hodgkin-Huxley Model It is a mathematical model for the generation and transmission of action potentials i

Rabia KOÇ 8 Jul 16, 2022
Python package to transfer data in a fast, reliable, and packetized form.

pySerialTransfer Python package to transfer data in a fast, reliable, and packetized form.

PB2 101 Dec 07, 2022
An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Fastlane An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines. Project structure fastlane |- fastlane: (ETL fr

Dan Katz 2 Jan 06, 2022
INF42 - Topological Data Analysis

TDA INF421(Conception et analyse d'algorithmes) Projet : Topological Data Analysis SphereMin Etant donné un nuage des points, ce programme contient de

2 Jan 07, 2022