Pub/Sub Proto to BigQuery with Python UDF template

The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.

A Python user-defined function (UDF) can be provided to transform data. Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors.

Pipeline requirements

  • The input Pub/Sub subscription must exist.
  • The schema file for the Proto records must exist on Cloud Storage.
  • The output Pub/Sub topic must exist.
  • The output BigQuery dataset must exist.
  • If the BigQuery table exists, it must have a schema matching the proto data regardless of the createDisposition value.

Template parameters

Parameter Description
protoSchemaPath The Cloud Storage location of the self-contained proto schema file. For example, gs://path/to/my/file.pb. This file can be generated with the --descriptor_set_out flag of the protoc command. The --include_imports flag guarantees that the file is self-contained.
fullMessageName The full proto message name. For example, package.name.MessageName, where package.name is the value provided for the package statement and not the java_package statement.
inputSubscription The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec The BigQuery output table location. For example, my-project:my_dataset.my_table. Depending on the createDisposition specified, the output table might be created automatically using the input schema file.
preserveProtoFieldNames Optional: true to preserve the original Proto field name in JSON. false to use more standard JSON names. For example, false would change field_name to fieldName. (Default: false)
bigQueryTableSchemaPath Optional: Cloud Storage path to BigQuery schema path. For example, gs://path/to/my/schema.json. If this is not provided, then the schema is inferred from the Proto schema.
pythonExternalTextTransformGcsPath Optional: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Optional: The name of the Python user-defined function (UDF) that you want to use.
udfOutputTopic Optional: The Pub/Sub topic storing the UDF errors. For example, projects/<project-id>/topics/<topic-name>. If this is not provided, UDF errors are sent to the same topic as outputTopic.
writeDisposition Optional: The BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND.
createDisposition Optional: The BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED.
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
numStorageWriteApiStreams Optional: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
storageWriteApiTriggeringFrequencySec Optional: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the Pub/Sub message data field, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the Pub/Sub Proto to BigQuery with Python UDF template.
    6. In the provided parameter fields, enter your parameter values.
    7. Click Run job.

    gcloud

    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Replace the following:

    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
    • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
    • BIGQUERY_TABLE: the BigQuery output table name
    • UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
    • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
    • BIGQUERY_TABLE: the BigQuery output table name
    • UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue

    What's next