Dataflow SQL user-defined functions

Dataflow SQL supports user-defined functions (UDFs) written in Java or SQL. These functions accept columns of input and perform actions, returning the result of those actions as a value.

Java UDFs

Java UDFs can be either scalar functions or aggregate functions. A scalar UDF takes zero or more scalar parameters and returns a single scalar value. An aggregate UDF accepts a set of values, aggregates the results, and returns a scalar value.

Syntax

CREATE [AGGREGATE] FUNCTION
    function_name
    ([named_parameter[, ...]])
  RETURNS data_type
  LANGUAGE java
  OPTIONS (path='jar_file_uri')
named_parameter:
  param_name param_type

This syntax consists of the following components:

  • CREATE { FUNCTION | AGGREGATE FUNCTION }. Creates a function. To create a scalar function, use CREATE FUNCTION. To create an aggregate function, use CREATE AGGREGATE FUNCTION.

  • function_name. Specifies the name of the function.

  • named_parameter. Consists of a param_name and param_type pair. The value of param_type is a Dataflow SQL data type.

    Scalar functions can have multiple parameters, separated by commas. Aggregate functions can accept a single parameter.

  • RETURNS data_type. Specifies the data type that the function returns.

  • OPTIONS (path = jar_file_uri). Specifies the Cloud Storage URI of the JAR file that implements the function.

The CREATE FUNCTION or CREATE AGGREGATE FUNCTION statement defines the signature of the function. The implementation is provided as a JAR file. Upload the JAR file to Cloud Storage and use the path option to specify the Cloud Storage URI of the JAR file.

To invoke the function, include a SELECT statement after the CREATE FUNCTION or CREATE AGGREGATE FUNCTION statement. Separate multiple statements in the query using a semicolon. The last statement must be a SELECT statement, and you can use only one SELECT statement.

Java UDF example

The following example defines a Java UDF and calls the function in a SELECT statement:

CREATE FUNCTION fun (a INT64)
  RETURNS INT64
  LANGUAGE java
  OPTIONS (
    path='gs://my-bucket/udf.jar'
  );

SELECT fun(1) as b;

Implementing a Java UDF

Scalar functions

To create a scalar UDF, extend the ScalarFn class. Your subclass must contain exactly one method annotated with @ApplyMethod. The method parameters and return type must match the data types that you declared in the CREATE FUNCTION statement. For more information, see Type mappings in this document.

The following example implements a UDF that increments a number and returns the result.

public class IncrementFn extends ScalarFn {
  @ApplyMethod
  public Long increment(Long i) {
    return i + 1;
  }
}

Aggregate functions

To create an aggregate UDF, create a class that implements the AggregateFn interface.

The methods on this interface have the equivalent semantics as methods on the Combine.CombineFn class in the Apache Beam SDK for Java.

UDF provider

In your JAR file, include exactly one public class that implements the UdfProvider interface. The userDefinedScalarFunctions and userDefinedAggregateFunctions methods on this interface map function names to either scalar or aggregate functions. At runtime, Dataflow calls these methods to discover which function to invoke.

The following example shows an implementation of UdfProvider for a scalar function named helloWorld.

@AutoService(UdfProvider.class)
public class ExampleUdfProvider implements UdfProvider {
  @Override
  public Map<String, ScalarFn> userDefinedScalarFunctions() {
    return ImmutableMap.of("helloWorld", new HelloWorldFn());
  }

  public static class HelloWorldFn extends ScalarFn {
    @ApplyMethod
    public String helloWorld() {
      return "Hello world!";
    }
  }
}

Type mappings

When a Java UDF is invoked, Dataflow converts the function parameters to Java types and converts the return value back to a Dataflow SQL type. The following table shows how the types are mapped.

SQL data type Java class
ARRAY java.util.List
BOOL java.lang.Boolean
BYTES byte[]
DATE java.util.Date
FLOAT64 java.lang.Double
INT64 java.lang.Long
NUMERIC java.math.BigDecimal
STRING java.lang.String
TIMESTAMP java.sql.Timestamp

SQL UDFs

A SQL UDF lets you create a function by using another SQL expression.

Syntax

CREATE FUNCTION
    function_name
    ([named_parameter[, ...]])
  [RETURNS data_type]
  LANGUAGE sql
  AS (sql_expression)
named_parameter:
  param_name param_type

This syntax consists of the following components:

  • CREATE FUNCTION. Creates a function.

  • function_name. Specifies the name of the function.

  • named_parameter. Consists of a comma-separated param_name and param_type pair. The value of param_type is a Dataflow SQL data type.

  • [RETURNS data_type]. Specifies the data type that the function returns. For SQL UDFs, the RETURNS clause is optional.

  • sql_expression. Specifies the SQL expression that defines the function.

To invoke the function, include a SELECT statement after the CREATE FUNCTION statement. Separate multiple statements in the query using a semicolon. The last statement must be a SELECT statement, and you can use only one SELECT statement.

SQL UDF example

The following example creates a function named addFourAndMultiply and calls the function on a list of numbers.

CREATE FUNCTION addFourAndMultiply(x INT64, y INT64) AS ((x + 4) * y);

WITH numbers AS
  (SELECT 1 as val
  UNION ALL
  SELECT 3 as val
  UNION ALL
  SELECT 4 as val
  UNION ALL
  SELECT 5 as val)
SELECT val, addFourAndMultiply(val, 2) AS result
FROM numbers;