Flink SQL learn
2021/4/27 19:25:37
本文主要是介绍Flink SQL learn,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1. 搭建测试环境安装
1.1 下载并启动docker-compose容器
# 该 Docker Compose 中包含的容器有: # DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。 # MySQL:集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。 # Kafka:主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。 # Zookeeper:Kafka 容器依赖。 # Elasticsearch:主要存储 Flink SQL 产出的数据。 # Kibana:可视化 Elasticsearch 中的数据 mkdir -p /data/flink/flink-demo cd /data/flink/flink-demo wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml # 启动: docker-compose up -d # 停止并删除: docker-compose down # 重启: docker-compose restart # 查看kafka测试数据 docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
1.2 下载安装 Flink 本地集群
# https://downloads.apache.org/flink/ wget "https://downloads.apache.org/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz" gzip -d flink-1.10.3-bin-scala_2.11.tgz tar -xvf flink-1.10.3-bin-scala_2.11.tar /data/flink/flink-1.10.3 ln -s /data/Apps/flink-1.10.3 /data/flink/flink # 下载flink sql connect包 # https://repo1.maven.org/maven2/org/apache/flink/ cd /data/flink/flink/lib/ wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.3/flink-json-1.10.3.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.3/flink-sql-connector-kafka_2.11-1.10.3.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.3/flink-sql-connector-elasticsearch6_2.11-1.10.3.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.3/flink-jdbc_2.11-1.10.3.jar wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar # 修改并发配置 vi /data/flink/flink/conf/flink-conf.yaml taskmanager.numberOfTaskSlots: 10 # 启动Flink bin/stop-cluster.sh bin/start-cluster.sh # 启动 SQL CLI bin/sql-client.sh embedded
2. 创建实时任务
-- 创建kafka数据源表 CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json' -- 数据源格式为 json ); -- 验证SQL show databases; create database demo; use demo; show tables; describe user_behavior; SELECT * FROM user_behavior limit 10; -- 数据显示方式 SET execution.result-mode=changelog; SET execution.result-mode=table; -- 创建统计每小时的成交量的elasticsearch结果表 CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本 'connector.hosts' = 'http://10.8.60.127:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相当于数据库的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 'format.type' = 'json', -- 输出数据格式 json 'update-mode' = 'append' ); -- 统计每小时的成交量 INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR) ; -- 统计一天每10分钟累计独立用户数的es结果表 CREATE TABLE cumulative_uv ( time_str STRING, uv BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'cumulative_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' ); -- 创建预处理的视图 CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); -- 统计SQL INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str; -- 创建mysql维表 CREATE TABLE category_dim ( sub_category_id BIGINT, -- 子类目 parent_category_id BIGINT -- 顶级类目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink', 'connector.table' = 'category', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); -- 创建顶级类目操行es表 CREATE TABLE top_category ( category_name STRING, -- 类目名称 buy_cnt BIGINT -- 销量 ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'top_category', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' ); -- 创建视图 create view rich_user_behavior as select u.user_id ,u.item_id ,u.behavior, case c.parent_category_id when 1 then '服饰鞋包' when 2 then '家装家饰' when 3 then '家电' when 4 then '美妆' when 5 then '母婴' when 6 then '3c数码' when 7 then '运动户外' when 8 then '食品' else '其他' end as category_name from user_behavior as u left join category_dim for system_time as of u.proctime as c on u.category_id = c.sub_category_id ; -- 按顶级类目进行统计 INSERT INTO top_category SELECT category_name ,COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;
http://10.8.60.127:5601
REF https://iteblog.blog.csdn.net/article/details/111465792 https://blog.csdn.net/weixin_42066446/article/details/113243977 https://blog.csdn.net/weixin_43039757/article/details/112850707 https://blog.csdn.net/wshl1234567/article/details/104512644/ https://mp.weixin.qq.com/s/pXJfxp0wxdlafFyg4tgiGg
这篇关于Flink SQL learn的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?