ETL: BigQuery and Apache airflow

Amanuel Zewdu
6 min readJan 3, 2023

A Simple extract, transform, load pipeline on GCP

Overview

ETL provides the foundation for data analytics and machine learning workstreams. Its main purpose is to cleanse and organize data in a way which addresses specific business intelligence needs, like monthly reporting, but it can also tackle more advanced analytics, which can improve back-end processes or end user experiences. ETL, as its name might suggest, is often used by an organization to mainly perform the following tasks.

  • Extract data
  • Cleanse the data to improve data quality and establish consistency
  • Load data into a target database

Google Cloud has a variety of powerful ETL tools that ensure you don’t have to do ETL manually and compromise the integrity of your data. These include data preparation, pipeline building and management, and workflow orchestration tools, like cloud data fusion. Analysts and engineers can alternatively use programming languages like Python to build their own ETL pipelines. This allows them to customize and control every aspect of the pipeline, but a handmade pipeline also requires more time and effort to create and maintain. This project tries to build a customized scalable ETL using the python programming language and some bash scripting together with airflow on cloud composer.

DATA

The data for this project is extracted from Shopify API using the ‘requests’ python module. To get an API key we need to register to the shopify.dev as the admin of our store and create a custom app inside. This is used in our case to retrieve shopify order level data and we’ve already been provided with the necessary tokens. Using those and the shop url, we can form the respective url with status and limit parameters and give it to the ‘requests’ python package then it brings back the required metadata, carrying it in the response variable created for a session request.

  • The API KEY and other credential information are saved on a separate ‘creds.py’ config file and called as a module in the Airflow dag. This obviously keeps the tokens a secret and will not reveal them to everyone looking at our code. Use this reference

TECH-STACK

Tech- stack diagram

STEPS

  • Shopify API: This API gives developers a complete and easy to use toolset for creating powerful applications to be used in Shopify’s online stores. It also provides datasets about stores, given the necessary token. Refer here
  • Requests python module: is the python package/module allows us to send HTTP requests using python which returns a response object with the main content plus encoding and status data of that specific request. It can simply be installed using pip install requests. The documentation to this module is attached here.
  • Data preparation: After catching the response returned from the session, We used pandas to prepare the data to be inserted to BQ. The following images show the implementation and the metadata for the output CSV file.

First we extract the desired feilds from the json response we retrived. Then we append each attribute to a list, which then can be converted into a dataframe.

Extract the desired fields from the json response
Appending onto a list
Columns in the prepared data
  • A standard transformation tool, Google Cloud Dataflow is a cloud-based data processing service for both batch and real-time data streaming applications. It enables developers to set up processing pipelines for integrating, preparing and analyzing large data sets, such as those found in Web analytics or big data analytics applications.
  • To transform the source dataset, the prepared data was passed to a transform_function to change it to the format that we find fit. The following images show the basic transformations and output dataset acquired.
Data transformation
Transformed data information
  • Cloud composer: It is a fully managed workflow orchestration service built on Apache Airflow. By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow functionalities. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.
  • Apache Airflow: This is the main tool utilized in this project. A DAG(Directed acyclic graph) containing 6 tasks run to complete the pipeline. Two of them are bash operators, used to copy csv files to the airflow data bucket. Three tasks are python operators which call on python functions. The last task used is a GCSToBigQueryOperator which loads the data in to the DB provided the schema. The following images show the implementation of the python_callable functions, the bash scripts and GCSToBigQueryOperator.

This is the task orientation for the complete pipeline. After bringing the above python functions together, we assign them to tasks in the airflow dag as follows, and we add the neseccary bash scripts to copy the created dataframes. Finally order the task to run starting from task1 through to task6.

Airflow tasks running on a daily basis
shopify_workflow graph
  • Cloud storage: This is simply an object storage where you can put your data in. Every source code and data is found in here enclosed in a bucket.
  • Big query: BigQuery is Google Cloud’s fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run analytics over vast amounts of data in near real time. For our purpose we created a table named order_level_data and populated it with the prepared data through an airflow task. The schema of the table is declared as follows.

The ‘schema’ list contains a representation of the table schema with column names and data types.

A function to create a BQ table if it doesn’t exist.
Schema : order_level_data — dataset

CONCLUSION

The steps followed to implement the ETL can be summrized as follows:

  1. Fetching the data: To fetch the data two packages are available. The first one is the ShopifyAPI, which is a wrapper around the Shopify API that makes it easy to access data from Shopify stores in python and can be simply installed using pip. The second alternative (which is used in this project) is the python Requests package. This module allows us to send HTTP requests using python which returns a response object with the main content plus encoding and status data of that specific request.
  2. Deploying and scheduling the data: The Cloud composer feature on google cloud is used to provide a fully managed service on the cloud. Apache airflow is deployed on this cloud composer service in order to orchestrate the task flow. There’s also the Google Cloud data fusion tool which is a cloud native data integration tool. It is a fully managed google cloud ETL tool that allows integration at any scale.
  3. Schema: BigQuery lets you specify a table’s schema when you load data into a table, and when you create an empty table. Alternatively, you can use schema auto-detection for supported data formats.
  4. Loading the data: We first loaded the data to a bucket on cloud storage and then using a federated query table whose schema is given below. This method is better because it gives you the chance to explore the data and do some pre processing prior loading it. It also helps avoid data loss in case the system breaks.

REFERENCES: FURTHER READING

--

--

Amanuel Zewdu

Junior data engineer who builds scalable data pipelines using ETL tools; Airflow, Kafka and Dbt with data modeling dexterity; Python and SQL