FLink18--全窗口聚合方式2 ProcessWindowApp
2022/3/27 23:26:21
本文主要是介绍FLink18--全窗口聚合方式2 ProcessWindowApp,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、依赖
二、代码
package net.xdclass.class11; import java.util.List; import java.util.stream.Collectors; import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; /** * 推荐本方法 * 全量聚合方法2 process(new ProcessWindowFunction(){}) * !!!WindowFunction后面可能废弃,用processWindowFunction更好,有打开关闭功能 * 全窗口函数,自定义窗口计算,适用于复杂场景 * @desc 窗口计算,全窗口函数,可以拿到整个窗口的数据做计算 * @menu */ public class FLink18ProcessWindowApp { public static void main(String[] args) throws Exception{ //WebUi方式运行 // final StreamExecutionEnvironment env = // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置运行模式为流批一体 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //并行度 env.setParallelism(1); //设置为自定义source // DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDs = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder videoOrder) throws Exception { return videoOrder.getTitle(); } }); //全窗口函数,可以拿到整个窗口的数据做计算 SingleOutputStreamOperator<VideoOrder> sumAllWindowDs = keyByDs .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<VideoOrder> iterable, Collector<VideoOrder> output) throws Exception { List<VideoOrder> list = IteratorUtils.toList(iterable.iterator()); if (list.size() <= 0) { return; } int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue(); //新建一个返回结果对象,数据聚合后发送出去 VideoOrder videoOrder = new VideoOrder(); videoOrder.setMoney(total); videoOrder.setTitle(list.get(0).getTitle()); videoOrder.setCreateTime(list.get(0).getCreateTime()); //获取窗口开始结束时间,还可以获取很多信息 System.out.println("窗口开始时间"+context.window().getStart()+"窗口结束时间"+context.window().getEnd()); output.collect(videoOrder); } }); sumAllWindowDs.print(); //DataStream需要调用execute,可以取个名称 env.execute("Sailing Window job"); } }
这篇关于FLink18--全窗口聚合方式2 ProcessWindowApp的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15PingCAP 黄东旭参与 CCF 秀湖会议,共探开源教育未来
- 2024-05-13PingCAP 戴涛:构建面向未来的金融核心系统
- 2024-05-09flutter3.x_macos桌面os实战
- 2024-05-09Rust中的并发性:Sync 和 Send Traits
- 2024-05-08使用Ollama和OpenWebUI在CPU上玩转Meta Llama3-8B
- 2024-05-08完工标准(DoD)与验收条件(AC)究竟有什么不同?
- 2024-05-084万 star 的 NocoDB 在 sealos 上一键起,轻松把数据库编程智能表格
- 2024-05-08Mac 版Stable Diffusion WebUI的安装
- 2024-05-08解锁CodeGeeX智能问答中3项独有的隐藏技能
- 2024-05-08RAG算法优化+新增代码仓库支持,CodeGeeX的@repo功能效果提升