Flink OutputTag 为什么需要加 "{}"

2022/6/7 23:21:36

本文主要是介绍Flink OutputTag 为什么需要加 "{}",对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Flink OutputTag 为什么需要加 "{}"

结论

先给出结论,OutputTag 可以加 也可以不加

// case 1   不加 {} ,运行时会报错
OutputTag<String> stringOutputTg = new OutputTag<String>("a");

// case 2   加 {} 就是 一个继承了 OutputTag 的 匿名类 
OutputTag<String> stringOutputTg = new OutputTag<String>("a"){};

// case 3   不加 也是没有问题
OutputTag<String> stringOutputTg = new OutputTag<String>("a", TypeInformation.of(String.class));

为什么case 1报错

其实 case 2 一个是继承了OutputTag 的匿名类的对象 ,case 1 是直接 OutputTag 的对象。问题就出在构造方法

    private final String id;

    private final TypeInformation<T> typeInfo;
    
    
    public OutputTag(String id) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;

        try {
            // 问题关键
            this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
        } catch (InvalidTypesException e) {
            throw new InvalidTypesException(
                    "Could not determine TypeInformation for the OutputTag type. "
                            + "The most common reason is forgetting to make the OutputTag an anonymous inner class. "
                            + "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.",
                    e);
        }
    }

case 1 的异常

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Could not determine TypeInformation for the OutputTag type. The most common reason is forgetting to make the OutputTag an anonymous inner class. It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.
	at org.apache.flink.util.OutputTag.<init>(OutputTag.java:68)
	at com.chouc.flink.OutputTg.main(OutputTg.java:52)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.util.OutputTag could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1371)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:787)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:774)
	at org.apache.flink.util.OutputTag.<init>(OutputTag.java:66)
	... 1 more

导致错误的代码

private static Type getParameterType(
        Class<?> baseClass, List<Type> typeHierarchy, Class<?> clazz, int pos) {
    if (typeHierarchy != null) {
        typeHierarchy.add(clazz);
    }
    Type[] interfaceTypes = clazz.getGenericInterfaces();

    // search in interfaces for base class
    for (Type t : interfaceTypes) {
        Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
        if (parameter != null) {
            return parameter;
        }
    }

	// 关键代码
    // search in superclass for base class
    Type t = clazz.getGenericSuperclass();
    Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
    if (parameter != null) {
        return parameter;
    }

    throw new InvalidTypesException(
            "The types of the interface "
                    + baseClass.getName()
                    + " could not be inferred. "
                    + "Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
}

原因

typeInfo 是一个带泛型的类,在构建的时候必须创建好对象,而这个泛型就是当前class 的泛型。如果在之传入id 不传入 typeInfo 的时候,就需要获取当前类的泛型,但是当前类是无法读到当前类的泛型,只能读 super 类的泛型。通过方法:java.lang.Class#getGenericSuperclass

例子

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public class OutputTg <T> {
    String a;
    T b;

    public OutputTg(String a) throws InstantiationException, IllegalAccessException {
        this.a = a;
        System.out.println(this);

        Type genericSuperclass = this.getClass().getGenericSuperclass();
        System.out.println(genericSuperclass);

        ParameterizedType baseClassChild = (ParameterizedType) genericSuperclass;
        System.out.println(baseClassChild);

        Type actualTypeArgument = baseClassChild.getActualTypeArguments()[0];
        System.out.println(actualTypeArgument);

        Class<? extends Type> bClass = (Class<? extends Type>) actualTypeArgument;
        System.out.println(bClass);

        this.b = (T) bClass.newInstance();

    }


    public static void main(String[] args) {


        OutputTg<String> stringOutputTg = null;
        try {
            // error
//            stringOutputTg = new OutputTg<String>("a");

            // 正常
            stringOutputTg = new OutputTg<String>("a") {};
        } catch (InstantiationException e) {
            throw new RuntimeException(e);
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
        System.out.println(stringOutputTg.a);
        System.out.println(stringOutputTg.b);


    }

}

为什么case3 可以

    private final String id;

    private final TypeInformation<T> typeInfo;
    
    public OutputTag(String id, TypeInformation<T> typeInfo) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;
        this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
    }

看了上面的原因,case 2 和 case 3 就很清晰,case 2 是继承了 OutputTg ,可以直接通过 java.lang.Class#**getGenericSuperclass 读到泛型类型。

而case 3 不用获取,用户直接传入更加方便简洁。



这篇关于Flink OutputTag 为什么需要加 "{}"的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程