/ GCP

Large data processing with Cloud Dataflow and Cloud Datastore

In one of my projects, I had the requirement to process large text files of the size of hundreds of MB up to GB or even TB. A user uploads a csv file and we are required to write each row into Google Cloud Datastore (no-SQL document database).

Obviously, in this size ranges it is not archivable using a simple web server. That’s where I took advantage of Google Cloud Dataflow, wrote a pipeline to process these csv files and saved them to the Google Cloud Datastore. In this article, I want to share with you how I solved this task.

Large-scaled text annotations can be hard, that’s why I am currently building a Text Annotator which can be used in Machine Learning projects. This article is based on my learnings during the implemenation process. Follow me on Twitter @HeyerSascha if you don’t want to miss it.

Prerequisites

You need two requirements: The first for apache beam itself and the second is for Dataflow. Please remember that if you want to run your pipeline locally you need to use Python 2.7. Apache Beam is still not supporting Python 3.

pip install apache-beam
pip install apache-beam[gcp]

A basic understanding of Apache Beam and Google Cloud Dataflow is beneficial.

How to start the Dataflow pipeline

Basically, you have five options to launch the pipeline:

  • Locally, this option is suitable for development purposes.
  • Directly in Dataflow, in case you wish to run the pipeline once.
  • In the Dataflow UI.
  • With the gcloud Command.
  • Via Cloud Dataflow API.

Please note that for the last 3 options you need to create a Dataflow template.

Cloud Dataflow templates

To create a Dataflow template you have to use the add_value_provider_argument ValueProvider. This is how we can pass arguments to our Dataflow pipeline. In our case, for example, the path to the csv file.

class ProcessOptions(PipelineOptions):
@classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
           '--input',
           dest='input',
           type=str,
           required=False,
           help='Local file or a file in a Google Storage Bucket.')

If you don’t use a template you simply can use:

parser.add_argument(...)

Build the pipeline

To build the pipeline we create a pipeline object and pass the PipelineOptions into it.

process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)

Our pipeline consists of 4 different steps:

  1. Read from text: Reads the input file either from the local path or from a Google Cloud Bucket, this is our pipeline data.
  2. Process CSV: Our first transformation which takes each row in our data and extracts the required information.
  3. Build entities: Creates Cloud Datastore entities
  4. Write entities: Finally writes each row as an entity into Cloud Datastore.
(p
     | 'Read from text' >> beam.io.ReadFromText(process_options.input, skip_header_lines=0)
     | 'Process CSV' >> 
beam.ParDo(ProcessCSV(),['text','label'])
     | 'Build entities' >> 
beam.ParDo(BuildEntities(), process_options.entity)
     | 'Write entities into Datastore' >> 
WriteToDatastore('annotator'))

And finally, we run our pipeline.

p.run().wait_until_finish()

Create the Dataflow template

To create the template we have to define --template_location with the Google Cloud Storage location of the template. The command then is as follows:

python2 -m process \     
  --runner DataflowRunner \     
  --project io-annotator \     
  --staging_location gs://io-dataflow/staging \     
  --temp_location gs://io-dataflow/temp \     
  --template_location gs://io-dataflow/templates/process-csv \    
  --save_main_session True

The command creates the template at your given --template_location:

NFO:root:A template was just created at location gs://io-dataflow/templates/process-csv

Your bucket should now contain two folders staging and template:

bucket

Metadata

Additionally, we extend our template with metadata to validate our parameter and add valuable information into the Dataflow UI.

metadata-1

The metadata file follows this naming_metadata and has to be uploaded into the templates folder.

{
  "name": "Transform CSV",
  "description": "Transforms a csv file and saves it into Cloud Datastore",
  "parameters": [{
    "name": "input",
    "label": "Input Cloud Storage file",
    "help_text": "The path to the csv file in Google Cloud Storage Bucket",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": false
  },
  {
    "name": "entity",
    "label": "Cloud Datastore entity name",
    "help_text": "The Google Cloud Datastore entity name",
    "is_optional": false
  }]
}

Launch the Dataflow template

In this article, we use the template with the Dataflow UI.

To launch the template go to Dataflow create job page and choose as Cloud Dataflow template the custom template. We required additional parameters Input Cloud Storage File and Cloud Datastore entity name.

create-job

Run the job and switch to Cloud Datastore to see the results.

jobs

Dependencies

This topic has cost me some time and ☕ that’s why I want to share the solution with you.

While locally everything worked with the DirectRunner, when I tried to launch the pipeline within Dataflow the following error appeared:

NameError: global name 'csv' is not defined

It turned out that functions, variables and global imports like import csv are not saved during the serialization of a Cloud Dataflow job. In other words, the state of the global namespace is not loaded on the Cloud Dataflow worker.

A look into the Dataflow FAQ helped to solve this issue, just add an additional pipeline option.

--save_main_session True

Cloud Datastore limitations

Cloud Datastore has a limitation of 1500 byte for UTF-8 encoded properties. This means that if you would like to save more then just a couple of words you, then you will get an error. If you have a look into the documentation, you notice quickly it is just a limitation for indexed properties. The maximum size for unindexed properties is 1 MB which is enough to fulfill our requirements.

The following setting excludes a property from the index. As a tradeoff, you are not capable to search this property anymore.

exclude_from_indexes=True

Wrapping up

I hope I could share with you how to use Cloud Dataflow to process large files stored in Google Cloud Storage and transform them into Cloud Datastore entities.

As part of the next article, I will share with you how to combine Cloud Function and Cloud Dataflow to a serverless large data processing environment as we wish to archive a fully automated pipeline.

Thanks for reading.

The code for this article and sample data is available on GitHub.

Your feedback and questions are highly appreciated, you can find me on Twitter @HeyerSascha.