flink 流的合并
2022/6/17 23:28:30
本文主要是介绍flink 流的合并,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
flink 流的合并操作
- union
union只能合并类型相同的数据,合并的结果仍然是DataStream,结果操作与未合并之前一致。
public static void main(String[] args) throws Exception { //流的合并操作 union 只能合并类型相同的流 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> ds1 = env.fromElements("night", "Jim", "Mary"); DataStreamSource<String> ds2 = env.fromElements("四川", "北京", "上海"); DataStream<String> union = ds1.union(ds2); union.print(); env.execute(); } 11> 北京 9> Mary 12> 上海 8> Jim 7> night 10> 四川
- connect
connect可以连接不同类型的流,后续的处理api也有类似的不同,下列是一个tuple2与Long类型的流合并的结果,做了一个keyBy之后,在map的操作,map的实现接口是CoMapFunction
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, String>> ds1 = env.fromElements(Tuple2.of("四川", "成都"), Tuple2.of("北京", "朝阳"), Tuple2.of("广东", "深圳"),Tuple2.of("四川", "成都")); DataStreamSource<Long> ds2 = env.fromElements(1L, 2L, 3L,2L); ConnectedStreams<Tuple2<String, String>, Long> connect = ds1.connect(ds2); connect.keyBy(data -> data.f0,data -> data).map(new CoMapFunction<Tuple2<String, String>, Long, String>() { // @Override public String map1(Tuple2<String, String> stringStringTuple2) throws Exception { return "this is tuple" + stringStringTuple2; } @Override public String map2(Long aLong) throws Exception { return "this is number" + aLong; } }).print(); env.execute(); 6> this is tuple(广东,深圳) 7> this is tuple(北京,朝阳) 15> this is number3 16> this is tuple(四川,成都) 11> this is number1 16> this is number2 16> this is tuple(四川,成都) 16> this is number2
这篇关于flink 流的合并的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?