当前位置:网站首页>Flink datastream type system typeinformation

Flink datastream type system typeinformation

2022-04-23 14:52:00 smartsi

Flink DataStream The events handled by the application exist in the form of data objects . The data object will be passed in when the function is called , At the same time, you can also output data objects . therefore ,Flink Internally, you need to be able to handle these objects . When transmitting or reading and writing status through the network, the back end 、 Check points and save points , They need to be serialized and deserialized . In order to do this more efficiently ,Flink Need to know more about the data types handled by the application .Flink Use the concept of type information to represent data types , And generate a specific serializer for each data type 、 Deserializer and comparator .

Besides ,Flink There is also a type extraction system , You can analyze the input and return types of functions to automatically obtain type information , Then get serializer and deserializer . however , In some cases , For example, the use of Lambda Function or generic type , Type information must be provided explicitly to make the application work properly or improve its performance .

In this paper , We will discuss Flink Supported data types , How to create type information for data types , And how to Flink The system cannot automatically infer the return type of a function , Finally, briefly explain the two scenarios for displaying the specified type information .

1. data type

Flink Support Java and Scala All common data types , It doesn't need to be like Hadoop To implement a specific interface as well (org.apache.hadoop.io.Writable), Automatically identify data types . The most used can be divided into the following categories , As shown in the figure below :

As you can see from the diagram Flink Types can be divided into basic types 、 An array type 、 The compound type 、 Auxiliary types and generics .

1.1 Basic types

Flink Can support all Java and Scala Native basic types ( Packaging type ) as well as Void、String、Date、BigDecimal、BigInteger Other types . For example, by creating... From a given set of elements DataStream Data sets :

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//  establish  Integer  Data sets of type 
DataStream<Integer> integerElements = env.fromElements(1, 2, 3);
//  establish  String  Data sets of type 
DataStream<String> stringElements = env.fromElements("1", "2", "3");

1.2 An array type

The array type contains two types :

  • Array of basic types : The basic type of Java Array , Support boolean、byte、short、int、long、float etc.
  • An array of objects :Object Type of Java Array , Support String And other objects

For example, by creating... From a given set of elements DataStream Data sets :

int[] a = {1, 2};
int[] b = {3, 4};
DataStream<int[]> arrayElements = env.fromElements(a, b);

1.3 Composite data type

1.3.1 Java Tuples type

Flink stay Java The tuple class... Is defined in the interface (Tuple) For user's use . Tuples are composite data types composed of a fixed number of strongly typed fields . As shown in the following code , establish Tuple Data type data set :

DataStream<Tuple2> tupleElements = env.fromElements(new Tuple2(1, "a"), new Tuple2(2, "b"));

Flink Provides Java Efficient implementation of tuples , Include at most 25 A field , Each field length corresponds to a separate implementation , namely Tuple0 To Tuple25. If the number of fields exceeds the upper limit , By inheritance Tuple Class .

1.3.2 Scala Case Class And Tuple type

Flink Support arbitrary Scala Case Class as well as Scala tuples type , The maximum number of fields supported is 22, It supports obtaining indicators through field name and location index , Storing null values is not supported . The following code example shows , Definition WordCount Case Class data type , And then through fromElements Method creation input Data sets , call keyBy() Method on the data set according to word Field repartition .

//  Definition WordCount Case Class data structure 
case class WordCount(word: String, count: Int)
//  adopt fromElements Method to create a dataset 
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") //  according to word Fields are partition fields ,
val keyStream2 = input.keyBy(0) // You can also specify position Partition 

By using Scala Tuple establish DataStream Data sets , Other ways of use and Case Class be similar . It should be noted that , If you get a field by name , have access to Tuple Default field name in :

//  adopt  scala Tuple  Create a dataset with two elements 
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),("c", 2))
//  Use the default field name to get the field , among  _1  Express  tuple  The first field of 
tupleStream.keyBy("_1")

1.3.3 ROW type

Row It's a fixed length 、 A compound type that recognizes null values , Store multiple values in a determined field order . The type of each field can be different, and each field can be empty . Because the type of row field cannot be inferred automatically , So it's generating Row You need to provide type information . As shown in the following code , establish Row Data type data set :

DataStream<Row> rowElements = env.fromElements(Row.of(0, "a", 3.14));

1.3.4 POJO type

Flink Will analyze data types that do not belong to any category , Try to use them as POJO Type to handle . If a type satisfies the following conditions ,Flink Will regard them as POJO data type :

  • POJOs Class must be a public class ,Public Decorated and independently defined , It can't be an inner class ;
  • POJOs Class must contain a Public Modified parameterless constructor ;
  • POJOs All fields in the class must be Public Or with Public Embellished getter and setter Method ;
  • POJOs The field type in the class must be Flink Supported by .

for example , as follows Java Class will be Flink Identified as POJO:

// (1)  Must be  Public  Modifier and must be defined independently , It can't be an inner class 
public class Person {
    // (4)  Field type must be  Flink  Supported by 
    private String name;
    private int age;
    // (2)  Must contain a  Public  Modified parameterless constructor 
    public Person() {
    }
    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
    // (3)  All fields must be  Public  Or with  Public  Embellished  getter  and  setter  Method 
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}

Well defined POJOs Class after , You can go to Flink In the environment , As shown in the following code , Use fromElements Interface construction Person Class data set :

env.fromElements(new Person("Lucy", 18), new Person("Tom", 12))

1.4 Auxiliary type

stay Flink Some special data types are also supported in , for example Scala Medium List、Map、Either、Option、Try data type , as well as Java in Either data type , also Hadoop Of Writable data type . As shown in the following code , establish List Type dataset :

DataStream<ArrayList<Integer>> listElements = env.fromElements(
        Lists.newArrayList(1, 2), Lists.newArrayList(3, 4)
);

The usage scenario of this data type is not particularly extensive , The main reason is that the operations in the data are relatively different POJOs Class as convenient and transparent , The user cannot get the field information according to the field location or name , At the same time, with the help of Types Hint help Flink Infer data type information .

1.5 The generic type

Types that cannot be specially handled are treated as generic types and handed over to Kryo Serialization framework for serialization . If possible , Try to avoid using Kryo.Kryo As a general serialization framework , Usually inefficient .

2. TypeInformation

So many data types , stay Flink How is the interior expressed ? stay Flink Each specific type in the is corresponding to a specific TypeInformation Implementation class . for example ,BasicTypeInformation Medium IntegerTypeInformation Corresponding Integer data type . The description information of data type is provided by TypeInformation Definition , More commonly used TypeInformation Yes BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo Class etc. , As shown in the figure below :

TypeInformation Provide the necessary information for the system to generate serializer and comparator . When the application commits for execution ,Flink The type system will try to automatically infer for each data type processed TypeInformation. The type extractor will analyze the generic type and return type of the function , To get the corresponding TypeInformation object . however , Sometimes the type extractor fails , Or you may want to define your own type and tell Flink How to deal with them effectively . under these circumstances , You need to generate... For a specific data type TypeInformation.

In addition to a description of the type ,TypeInformation It also provides support for serialization . every last TypeInformation Will provide an exclusive serializer for the corresponding specific data type .TypeInformation A createSerialize() Method , Through this method, we can get the serializer of this type for data serialization and deserialization TypeSerializer:

public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
    return this.serializer;
}

For most data types Flink The corresponding serializer can be generated automatically , Can serialize and deserialize datasets very efficiently , such as ,BasicTypeInfo、WritableTypeIno etc. , But aim at GenericTypeInfo type ,Flink Will use Kyro Serialization and deserialization . among ,Tuple、Pojo and CaseClass Type is compound type , They may nest one or more data types . under these circumstances , Their serializers are also composite . They delegate serialization of the embedded type to the serializer of the corresponding type .

3. According to specified TypeInformation

Most of the time ,Flink It can automatically infer the type and generate the correct TypeInformation, And select the appropriate serializer and comparator .Flink The type extractor uses reflection to analyze function signatures and subclass information , Generate the correct output type of the function . But sometimes it is impossible to extract the necessary information , For example, if generics are used when defining functions ,JVM There will be the problem of type erasure , bring Flink It is not easy to obtain the data type information in the data set . At this time, the following similar exceptions may be thrown :

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ReturnsExample.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1236)
	at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
...

Besides , In some cases ,Flink Select the TypeInformation The most efficient serializers and deserializers may not be generated . therefore , You may need to explicitly provide... For the data types you use TypeInformation. Let's first look at how to create TypeInformation, Then let's see how to specify... For the function TypeInformation.

3.1 establish TypeInformation

3.1.1 of Method

For non generic types , have access to TypeInformation Of of(Class typeClass) Function is passed in directly Class I can create TypeInformation:

//  Example 1  Non generic types   Direct in  Class  object 
DataStream<WordCount> result1 = env.fromElements("a b a")
        .flatMap((String value, Collector<WordCount> out) -> {
            for(String word : value.split("\\s")) {
                out.collect(new WordCount(word, 1));
            }
        })
        .returns(TypeInformation.of(WordCount.class));
result1.print("R1");

The above method is only applicable to non generic types . If it's a generic type , Can use TypeHint Create for generic types TypeInformation:

//  Example 2  The generic type   Need help  TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result2.print("R2");

Complete example

3.1.2 TypeHint

For generic types , It's through TypeInformation.of + TypeHint To create TypeInformation, It can also be used alone TypeHint To create TypeInformation:

DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo());
result2.print("R2");

TypeHint The principle of is to create an anonymous subclass inside , Capture generic information and keep it until runtime . Runtime TypeExtractor You can get the actual type saved .

Complete example

3.1.3 Predefined shortcuts

for example BasicTypeInfo Class defines a series of shortcuts to common types , about String、Boolean、Byte、Short、Integer、Long、Float、Double、Char And so on , You can use it directly :

Of course , If you feel BasicTypeInfo It's still too long ,Flink It also provides a completely equivalent Types class (org.apache.flink.api.common.typeinfo.Types):

Types Provides for common data types TypeInformation, It is very convenient to use , The following example :

//  Example 1 Types.TUPLE
DataStream<Tuple2<String, Integer>> result1 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));
result1.print("R1");

//  Example 2 Types.POJO
DataStream<WordCount> result2 = env.fromElements("a b a")
        .flatMap((String value, Collector<WordCount> out) -> {
            for(String word : value.split("\\s")) {
                out.collect(new WordCount(word, 1));
            }
        })
        .returns(Types.POJO(WordCount.class));
result2.print("R2");

Complete example

3.2 Display the type information provided

When Flink When it is impossible to automatically infer the generation type of a function , We need to display the prompt of providing type information . From the above example, we know that we can use returns Display the type information provided , In addition, it can also realize ResultTypeQueryable Interface display provides .

3.2.1 returns

The first way is to use returns Add the type information prompt of the return type for the operator . For non generic types , It can be passed in directly Class that will do ; For generic types, you need to use TypeHint Provide type information prompt , As shown below :

//  Example 1  Non generic types   Direct in  Class
DataStream<WordCount> result1 = env.fromElements("a b a")
        .flatMap((String value, Collector<WordCount> out) -> {
            for(String word : value.split("\\s")) {
                out.collect(new WordCount(word, 1));
            }
        })
        .returns(WordCount.class);
result1.print("R1");

//  Example 2  The generic type   Priority is given to  TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(new TypeHint<Tuple2<String, Integer>>() {});
result2.print("R2");

//  Example 3 TypeInformation.of + TypeHint
DataStream<Tuple2<String, Integer>> result3 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result3.print("R3");

//  Example 4 Types  Shortcut 
DataStream<Tuple2<String, Integer>> result4 = env.fromElements("a", "b", "a")
        .map(value -> Tuple2.of(value, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));
result4.print("R4");

Complete example

3.2.2 ResultTypeQueryable

The second method is to realize ResultTypeQueryable Interface to extend the function to explicitly provide the return type TypeInformation. The following example is an example that explicitly provides a return type MapFunction:

public static class ResultTypeMapFunction implements MapFunction<String, Stu>, ResultTypeQueryable {
    @Override
    public Stu map(String value) throws Exception {
        String[] params = value.split(",");
        String name = params[0];
        int age = Integer.parseInt(params[1]);
        return new Stu(name, age);
    }

    @Override
    public TypeInformation getProducedType() {
        return Types.POJO(Stu.class);
    }
}

4. Use scenarios

4.1 Table turn DataStream

Table turn DataStream When ,Table And clearly DataStream Data structure of , Therefore, we need to give the current converted DataStream Explicit specified data type :

//  Turn into  Pojo  type 
DataStream<WordCount> stream1 = tEnv.toAppendStream(table, Types.POJO(WordCount.class));

//  Convert to  Row  type 
DataStream<Row> stream2 = tEnv.toAppendStream(table, Types.ROW(Types.STRING, Types.LONG));

4.2 Lambda Expressions and generics

because Java Generic types can cause type erasure problems , therefore Flink adopt Java The reflection mechanism reconstructs the type information as much as possible , For example, use function signature and subclass information . When the return type of a function depends on the input type , Some simple type inference will be included . But if you can't refactor all the generic type information , You need to use the type prompt to tell the system about the input parameter type information and output parameter information in the function . Use... As shown below returns Statement specifies the type of generated :

env.fromElements(1, 2, 3)
  .map(i -> Tuple2.of(i, i*i))
  //  If you don't specify  returns  Back to  TypeInformation  It throws an exception 
  .returns(Types.TUPLE(Types.INT, Types.INT))
  .print();

Reference resources :

版权声明
本文为[smartsi]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231450358024.html