This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Protobuf Format #
Format: Serialization Schema Format: Deserialization Schema
The Protocol Buffers Protobuf format allows you to read and write Protobuf data, based on Protobuf generated classes.
Dependencies #
In order to use the Protobuf format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Only available for stable releases. |
How to create a table with Protobuf format #
Here is an example to create a table using the Kafka connector and Protobuf format.
Below is the proto definition file.
syntax = "proto2";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;
message SimpleTest {
optional int64 uid = 1;
optional string name = 2;
optional int32 category_type = 3;
optional bytes content = 4;
optional double price = 5;
map<int64, InnerMessageTest> value_map = 6;
repeated InnerMessageTest value_arr = 7;
optional Corpus corpus_int = 8;
optional Corpus corpus_str = 9;
message InnerMessageTest{
optional int64 v1 =1;
optional int32 v2 =2;
}
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 7;
}
}
- Use
protoc
command to compile the.proto
file to java classes - Then compile and package the classes (there is no need to package proto-java into the jar)
- Finally you should provide the
jar
in your classpath, e.g. pass it using-j
in sql-client
CREATE TABLE simple_test (
uid BIGINT,
name STRING,
category_type INT,
content BINARY,
price DOUBLE,
value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
value_arr array<row<v1 BIGINT, v2 INT>>,
corpus_int INT,
corpus_str STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'protobuf',
'protobuf.message-class-name' = 'com.example.SimpleTest',
'protobuf.ignore-parse-errors' = 'true'
)
Format Options #
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
format |
required | no | (none) | String | Specify what format to use, here should be 'protobuf' . |
protobuf.message-class-name |
required | no | (none) | String | The full name of a Protobuf generated class. The name must match the message name in the proto definition file. $ is supported for inner class names, like 'com.exmample.OuterClass$MessageClass' |
protobuf.ignore-parse-errors |
optional | no | false | Boolean | Optional flag to skip rows with parse errors instead of failing. |
protobuf.read-default-values |
optional | yes | false | Boolean | This option only works if the generated class's version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file. If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message. If the proto syntax is proto3, this value will forcibly be set to true, because proto3's standard is to use default values. |
protobuf.write-null-string-literal |
optional | no | "" | String | When serializing to protobuf data, this is the optional config to specify the string literal in Protobuf's array/map in case of null values. |
Data Type Mapping #
The following table lists the type mapping from Flink type to Protobuf type.
Flink SQL type | Protobuf type | Description |
---|---|---|
CHAR / VARCHAR / STRING |
string |
|
BOOLEAN |
bool |
|
BINARY / VARBINARY |
bytes |
|
INT |
int32 |
|
BIGINT |
int64 |
|
FLOAT |
float |
|
DOUBLE |
double |
|
ARRAY |
repeated |
Elements cannot be null, the string default value can be specified by write-null-string-literal |
MAP |
map |
Keys or values cannot be null, the string default value can be specified by write-null-string-literal |
ROW |
message |
|
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT |
enum |
The enum value of protobuf can be mapped to string or number of flink row accordingly. |
ROW<seconds BIGINT, nanos INT> |
google.protobuf.timestamp |
The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition. |
Null Values #
As protobuf does not permit null values in maps and array, we need to auto-generate default values when converting from Flink Rows to Protobuf.
Protobuf Data Type | Default Value |
---|---|
int32 / int64 / float / double | 0 |
string | "" |
bool | false |
enum | first element of enum |
binary | ByteString.EMPTY |
message | MESSAGE.getDefaultInstance() |
OneOf field #
In the serialization process, there’s no guarantee that the Flink fields of the same one-of group only contain at most one valid value. When serializing, each field is set in the order of Flink schema, so the field in the higher position will override the field in lower position in the same one-of group.
You can refer to Language Guide (proto2) or Language Guide (proto3) for more information about Protobuf types.