Protobuf
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Protobuf Format #

Format: Serialization Schema Format: Deserialization Schema

The Protocol Buffers Protobuf format allows you to read and write Protobuf data, based on Protobuf generated classes.

Dependencies #

In order to use the Protobuf format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Only available for stable releases.

How to create a table with Protobuf format #

Here is an example to create a table using the Kafka connector and Protobuf format.

Below is the proto definition file.

syntax = "proto2";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
    optional int64 uid = 1;
    optional string name = 2;
    optional int32 category_type = 3;
    optional bytes content = 4;
    optional double price = 5;
    map<int64, InnerMessageTest> value_map = 6;
    repeated  InnerMessageTest value_arr = 7;
    optional Corpus corpus_int = 8; 
    optional Corpus corpus_str = 9; 
    
    message InnerMessageTest{
          optional int64 v1 =1;
          optional int32 v2 =2;
    }
    
    enum Corpus {
        UNIVERSAL = 0;
        WEB = 1;
        IMAGES = 2;
        LOCAL = 3;
        NEWS = 4;
        PRODUCTS = 5;
        VIDEO = 7;
      }
}
  1. Use protoc command to compile the .proto file to java classes
  2. Then compile and package the classes (there is no need to package proto-java into the jar)
  3. Finally you should provide the jar in your classpath, e.g. pass it using -j in sql-client
CREATE TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'protobuf',
 'protobuf.message-class-name' = 'com.example.SimpleTest',
 'protobuf.ignore-parse-errors' = 'true'
)

Format Options #

Option Required Forwarded Default Type Description
format
required no (none) String Specify what format to use, here should be 'protobuf'.
protobuf.message-class-name
required no (none) String The full name of a Protobuf generated class. The name must match the message name in the proto definition file. $ is supported for inner class names, like 'com.exmample.OuterClass$MessageClass'
protobuf.ignore-parse-errors
optional no false Boolean Optional flag to skip rows with parse errors instead of failing.
protobuf.read-default-values
optional yes false Boolean This option only works if the generated class's version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file. If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message. If the proto syntax is proto3, this value will forcibly be set to true, because proto3's standard is to use default values.
protobuf.write-null-string-literal
optional no "" String When serializing to protobuf data, this is the optional config to specify the string literal in Protobuf's array/map in case of null values.

Data Type Mapping #

The following table lists the type mapping from Flink type to Protobuf type.

Flink SQL type Protobuf type Description
CHAR / VARCHAR / STRING string
BOOLEAN bool
BINARY / VARBINARY bytes
INT int32
BIGINT int64
FLOAT float
DOUBLE double
ARRAY repeated Elements cannot be null, the string default value can be specified by write-null-string-literal
MAP map Keys or values cannot be null, the string default value can be specified by write-null-string-literal
ROW message
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT enum The enum value of protobuf can be mapped to string or number of flink row accordingly.
ROW<seconds BIGINT, nanos INT> google.protobuf.timestamp The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.

Null Values #

As protobuf does not permit null values in maps and array, we need to auto-generate default values when converting from Flink Rows to Protobuf.

Protobuf Data Type Default Value
int32 / int64 / float / double 0
string ""
bool false
enum first element of enum
binary ByteString.EMPTY
message MESSAGE.getDefaultInstance()

OneOf field #

In the serialization process, there’s no guarantee that the Flink fields of the same one-of group only contain at most one valid value. When serializing, each field is set in the order of Flink schema, so the field in the higher position will override the field in lower position in the same one-of group.

You can refer to Language Guide (proto2) or Language Guide (proto3) for more information about Protobuf types.