Google Cloud: Creating a Streaming Data Pipeline for a Real-Time Dashboard with Dataflow

Share At:

Blog: An Interconnection with Google Cloud Dataflow | Tudip

Overview

Let’s suppose you own a fleet of New York City taxi cabs and are looking to monitor how well your business is doing in real-time. You will build a streaming data pipeline to capture taxi revenue, passenger count, ride status, and much more and visualize the results in a management dashboard.

Task 1. Create a Pub/Sub topic and BigQuery dataset

Pub/Sub is an asynchronous global messaging service. By decoupling senders and receivers, it allows for secure and highly available communication between independently written applications. Pub/Sub delivers low-latency, durable messaging.

In Pub/Sub, publisher applications and subscriber applications connect with one another through the use of a shared string called a topic. A publisher application creates and sends messages to a topic. Subscriber applications create a subscription to a topic to receive messages from it.

Google maintains a few public Pub/Sub streaming data topics for labs like this one. We’ll be using the NYC Taxi & Limousine Commission’s open dataset.

BigQuery is a serverless data warehouse. Tables in BigQuery are organized into datasets. In this lab, messages published into Pub/Sub will be aggregated and stored in BigQuery.

To create a new BigQuery dataset:

Option 1: The command-line tool

  1. Open Cloud Shell (Cloud Shell icon) and run the below command to create the taxirides dataset.
bq mk taxirides
  1. Run this command to create the taxirides.realtime table (empty schema that you will stream into later).
bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime

Option 2: The BigQuery Console UI

Note: skip these steps if you created the tables using the command line.

  1. In the Google Cloud Console, select Navigation menu > Big Data > BigQuery:
  2. The Welcome to BigQuery in the Cloud Console message box opens. This message box provides a link to the quickstart guide and lists UI updates.
  3. Click on the View actions icon next to your Project ID and click Create dataset.
  4. Set the Dataset ID as taxirides, leave all the other fields the way they are, and click CREATE DATASET.
  5. If you look at the left-hand resources menu, you should see your newly created dataset.
  6. Click on the View actions icon next to the taxirides dataset and click Open.
  7. Click CREATE TABLE.
  8. Name the table realtime
  9. For the schema, click Edit as text and paste in the below:
ride_id:string,
point_idx:integer,
latitude:float,
longitude:float,
timestamp:timestamp,
meter_reading:float,
meter_increment:float,
ride_status:string,
passenger_count:integer
  1. Under Partition and cluster settings, select the timestamp option for the Partitioning field.
  2. Click the CREATE TABLE button.

Task 2. Create a Cloud Storage bucket

Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use Cloud Storage for a range of scenarios including serving website content, storing data for archival and disaster recovery, or distributing large data objects to users via direct download. In this lab, you use Cloud Storage to provide working space for your Dataflow pipeline.

  1. In the Cloud Console, go to Navigation menu > Cloud Storage.
  2. Click CREATE BUCKET.
  3. For Name, paste in your GCP Project ID and then click Continue.
  4. For Location type, click Multi-region if it is not already selected.
  5. Click CREATE.

Bucket Created:


Task 3. Set up a Dataflow Pipeline

Dataflow is a serverless way to carry out data analysis. In this lab, you set up a streaming data pipeline to read sensor data from Pub/Sub, compute the maximum temperature within a time window, and write this out to BigQuery.

  1. In the Cloud Console, go to Navigation menu > Dataflow.
  2. In the top menu bar, click CREATE JOB FROM TEMPLATE.
  1. Enter streaming-taxi-pipeline-new as the Job name for your Dataflow job.
  2. Under Dataflow template, select the Pub/Sub Topic to BigQuery template.
  3. Under Input Pub/Sub topic, enter projects/pubsub-public-data/topics/taxirides-realtime
  4. Under BigQuery output table, enter <myprojectid>:taxirides.realtime
Note: There is a colon : between the project and dataset name and a dot . between the dataset and table name.
  1. Under Temporary location, enter gs://<mybucket>/tmp/.
  2. Click the RUN JOB button.

You will see that the job is running:

A new streaming job has started! You can now see a visual representation of the data pipeline.

Note: If the dataflow job fails for the first time then re-create a new job template with new job name and run the job.

Task 4. Analyze the taxi data using BigQuery

To analyze the data as it is streaming:

  1. In the Cloud Console, select Navigation menu > BigQuery.
  2. Enter the following query in the query EDITOR and click RUN:
SELECT * FROM taxirides.realtime LIMIT 10
  1. If no records are returned, wait another minute and re-run the above query (Dataflow takes 3-5 minutes to setup the stream). You will receive a similar output:

Task 5. Perform aggregations on the stream for reporting

  1. Copy and paste the below query and click RUN.
WITH streaming_data AS (
SELECT
  timestamp,
  TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour,
  TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute,
  TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second,
  ride_id,
  latitude,
  longitude,
  meter_reading,
  ride_status,
  passenger_count
FROM
  taxirides.realtime
WHERE ride_status = 'dropoff'
ORDER BY timestamp DESC
LIMIT 100000
)
# calculate aggregations on stream for reporting:
SELECT
 ROW_NUMBER() OVER() AS dashboard_sort,
 minute,
 COUNT(DISTINCT ride_id) AS total_rides,
 SUM(meter_reading) AS total_revenue,
 SUM(passenger_count) AS total_passengers
FROM streaming_data
GROUP BY minute, timestamp

The result shows key metrics by the minute for every taxi drop-off.


Task 6. Create a real-time dashboard

  1. Open this Google Data Studio link in a new incognito browser tab.
  2. On the Reports page, in the Start with a Template section, click the [+] Blank Report template.
  1. If prompted with the Welcome to Google Studio window, click Get started.
  2. Check the checkbox to acknowledge the Google Data Studio Additional Terms, and click Continue.
  3. Select No to all the questions, then click Continue.
  4. Switch back to the BigQuery Console.
  5. Click EXPLORE DATA > Explore with Data Studio in BigQuery page.
  1. Click GET STARTED, then click AUTHORIZE.
  2. Specify the below settings:
  • Chart type: Combo chart
  • Date range Dimension: dashboard_sort
  • Dimension: dashboard_sort
  • Drill Down: dashboard_sort (Make sure that Drill down option is turned ON)
  • Metric: SUM() total_ridesSUM() total_passengersSUM() total_revenue
  • Sort: dashboard_sortAscending (latest rides first)

Your chart should look similar to this:

Sample chart
Note: Visualizing data at a minute-level granularity is currently not supported in Data Studio as a timestamp. This is why we created our own dashboard_sort dimension.
  1. When you’re happy with your dashboard, click Save to save this data source.
  2. Whenever anyone visits your dashboard, it will be up-to-date with the latest transactions. You can try it yourself by clicking on the Refresh button near the Save button.

Task 7. Create a time series dashboard

  1. Click this Google Data Studio link to open Data Studio in a new browser tab.
  2. On the Reports page, in the Start with a Template section, click the [+] Blank Report template.
  3. A new, empty report opens with Add data to report.
  4. From the list of Google Connectors, select the BigQuery tile.
  5. Under CUSTOM QUERY, click qwiklabs-gcp-xxxxxxx > Enter Custom Query, add the following query.
SELECT
  *
FROM
  taxirides.realtime
WHERE
  ride_status='dropoff'
  1. Click Add > ADD TO REPORT.

You will see something like below:

Create a time series chart

  1. In the Data panel, scroll down to the bottom right and click ADD A FIELD. Click All Fields on the left corner.
  1. Change the field timestamp type to Date & Time > Date Hour Minute (YYYYMMDDhhmm).
  2. Click Continue and then click Done.
  3. Click Add a chart.
  4. Choose Time series chart.
  5. Position the chart in the bottom left corner – in the blank space.
  6. In the Data panel on the right, change the following:
  • Dimension: timestamp
  • Metric: meter_reading(SUM)

Your time series chart should look similar to this:

Sample time series chart

Note: if Dimension is timestamp(Date), then click on calendar icon next to timestamp(Date), and select type to Date & Time > Date Hour Minute.


Task 8. Stop the Dataflow job

  1. Navigate back to Dataflow.
  2. Click the streaming-taxi-pipeline or the new job name.
  3. Click STOP and select Cancel > STOP JOB.

This will free up resources for your project.

Congratulations!

In this lab you used Pub/Sub to collect streaming data messages from taxis and feed it through your Dataflow pipeline into BigQuery.


Share At:
0 0 votes
Article Rating
Subscribe
Notify of
guest
3 Comments
Newest
Oldest Most Voted
Inline Feedbacks
View all comments
gate io
19 days ago

At the beginning, I was still puzzled. Since I read your article, I have been very impressed. It has provided a lot of innovative ideas for my thesis related to gate.io. Thank u. But I still have some doubts, can you help me? Thanks.

註冊binance
22 days ago

Thanks for sharing. I read many of your blog posts, cool, your blog is very good. https://accounts.binance.com/zh-TC/register?ref=FIHEGIZ8

gate.io
3 months ago

For my thesis, I consulted a lot of information, read your article made me feel a lot, benefited me a lot from it, thank you for your help. Thanks!

Back To Top

Contact Us