Format received events

A pipeline connects a bus to a target destination, and routes event messages to that destination. You can configure a pipeline to expect event data in a specific format or, before events are delivered to a destination, you can convert event data from one supported format to another. For example, you might need to route events to an endpoint that only accepts Avro data.

Supported formats

The following format conversions are supported:

  • Avro to JSON
  • Avro to Protobuf
  • JSON to Avro
  • JSON to Protobuf
  • Protobuf to Avro
  • Protobuf to JSON

Note the following:

  • When you convert the format of events, only the event payload is converted and not the entire event message.

  • If an inbound data format is specified for a pipeline, all events must match that format. Any events that don't match the expected format are treated as persistent errors.

  • If an inbound data format is not specified for a pipeline, an outbound format can't be set.

  • Before an event format is converted for a specific destination, any data transformation that is configured is applied first.

  • Events are always delivered in a CloudEvents format using an HTTP request in binary content mode unless you specify a message binding.

  • JSON schemas are detected dynamically. For Protobuf schema definitions, you can define only one top-level type and import statements that refer to other types are not supported. Schema definitions without a syntax identifier default to proto2. Note that there is a schema size limit.

Configure a pipeline to format events

You can configure a pipeline to expect event data in a specific format, or to convert event data from one format to another, in the Google Cloud console or by using the gcloud CLI.

Console

  1. In the Google Cloud console, go to the Eventarc > Pipelines page.

    Go to Pipelines

  2. You can create a pipeline or, if you are updating a pipeline, click the name of the pipeline.

    Note that updating a pipeline might take more than 10 minutes.

  3. In the Pipeline details page, click Edit.

  4. In the Event mediation pane, do the following:

    1. Select the Apply a transformation checkbox.
    2. In the Inbound format list, select the applicable format.

      Note that if an inbound data format is specified for a pipeline, all events must match that format. Any events that don't match the expected format are treated as persistent errors.

    3. For Avro or Protobuf formats, you must specify an inbound schema. (Optionally, instead of specifying it directly, you can upload an inbound schema.)

    4. In the CEL expression field, write a transformation expression using CEL.

    5. Click Continue.

  5. In the Destination pane, do the following:

    1. If applicable, in the Outbound format list, select a format.

      Note that if an inbound data format is not specified for a pipeline, an outbound format can't be set.

    2. Optional: Apply a Message binding. For more information, see Message binding.

  6. Click Save.

gcloud

  1. Open a terminal.

  2. You can create a pipeline or you can update a pipeline using the gcloud beta eventarc pipelines update command:

    Note that updating a pipeline might take more than 10 minutes.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    Replace the following:

    • PIPELINE_NAME: the ID of the pipeline or a fully qualified name
    • REGION: a supported Eventarc Advanced location

      Alternatively, you can set the gcloud CLI location property:

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG: an input data format flag that can be one of the following:

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      Note that if an input data format is specified for a pipeline, all events must match that format. Any events that don't match the expected format are treated as persistent errors.

    • OUTPUT_PAYLOAD_KEY: an output data format key that can be one of the following:

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      Note that if you set an output data format key, you must also specify an input data format flag.

    Examples:

    The following example use an --input-payload-format-protobuf-schema-definition flag to specify that the pipeline should expect events in a Protobuf data format with a specific schema:

    gcloud beta eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    The following example uses an output_payload_format_avro_schema_definition key and an --input-payload-format-avro-schema-definition flag to create a pipeline that expects events in an Avro format and outputs them in the same format:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    The following example uses an output_payload_format_protobuf_schema_definition key and an --input-payload-format-avro-schema-definition flag to update a pipeline and convert its event data from Avro to Protobuf using schema definitions:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '