网站首页 > 技术文章 正文
flink sql方式读取kafka实时流数据
安装Flink
# 下载 Flink
cd /ops/app/flink
wget https://archive.apache.org/dist/flink/flink-1.19.2/flink-1.19.2-bin-scala_2.12.tgz
tar -xzf flink-1.19.2-bin-scala_2.12.tgz
mv flink-1.19.2 /ops/app/flink
# 添加依赖包
cd /ops/app/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.19/flink-connector-kafka-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar
wget https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar
环境变量
vim ~/.bashrc
export FLINK_HOME=/ops/app/flink # 或你的实际解压路径
export PATH=$PATH:$FLINK_HOME/bin
source ~/.bashrc
启动
/ops/app/flink/bin/start-cluster.sh
验证是否运行成功, 应看到
StandaloneSessionClusterEntrypoint 进程
jps
验证flink命令
flink --version
浏览器访问
http://localhost:8081/
如果访问不上,需要检查防火墙或者端口是否被占用。还有一个可能的原因,需要修改 conf 目录下的 config.yaml 配置,将 localhost 改为 0.0.0.0。端口冲突可以修改 rest.port 配置,默认是8081
rest:
bind-address: 0.0.0.0
# port: 8081
sql-client
SQL 客户端旨在提供一种简单的方法来编写、调试和提交表程序到 Flink 集群,而无需任何一行 Java 或 Scala 代码。
# 启动
bin/sql-client.sh
简单查询
在客户端输入一个测试sql。
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';
SELECT
name,
COUNT(*) AS cnt
FROM
(VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
GROUP BY name;
结果如下:
kafka sql connectors
进入sql-clients之后,使用 Flink SQL 创建 kafka 表和 MySQL 表
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建 Kafka 数据源表(JSON 格式)
CREATE TABLE KafkaSource (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- 从 Kafka 消息元数据提取时间戳
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-consumer-group',
'scan.startup.mode' = 'earliest-offset', -- 从最早位点开始消费
'format' = 'json' -- 指定 JSON 格式
);
-- 创建 MySQL Sink 表(结构参考前文)
CREATE TABLE MysqlSink (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP,
PRIMARY KEY (`user_id`, `item_id`, `ts`) NOT ENFORCED -- 主键需与 MySQL 表一致
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<ip>:3306/<database>?useSSL=false&serverTimezone=UTC',
'table-name' = 'test_flink', -- MySQL 表名
'username' = '<username>', -- 修改为你的数据库用户名
'password' = '<password>', -- 修改为你的数据库密码
'sink.buffer-flush.interval' = '1s' -- 写入批次间隔(可选调优)
);
创建test-topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
启动kafka生产者,发送消息
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
消息内容,json格式的数据最好做个压缩
{"user_id": 1001,"item_id": 2001,"behavior": "click"}
{"user_id": 1002,"item_id": 2002,"behavior": "select"}
sql-clients中,查询KafkaSource
SELECT user_id, item_id, behavior, ts
FROM KafkaSource;
查询结果如下
sql-clients中,将Kafka流数据插入到MysqlSink表
INSERT INTO MysqlSink
SELECT user_id, item_id, behavior, ts
FROM KafkaSource;
猜你喜欢
- 2025-07-28 大模型如何赋能Web渗透测试?(大模型如何赋能web渗透测试工具)
- 2025-07-28 动态切换数据库连接方案(动态切换数据库连接方案是什么)
- 2025-07-28 Apache Tika(apache tika工作流程)
- 2025-07-28 java -jar命令启动SpringBoot应用原理分析
- 2025-07-28 深度探索 Spring Boot3 配置管理:从基础到高级实战
- 2025-07-28 小程序源码交付标准详解:必备内容与注意事项
- 2025-07-28 记录程序第一天挖漏洞的过程(漏洞挖掘过程)
- 2025-07-28 springboot从入门到实战开源的全链路追踪系统介绍及实践!
- 2025-07-28 从原理到落地:MCP在Spring AI中的工程实践
- 2025-07-28 刚搭完HBase集群,Phoenix一启动,HBase就全崩了,是什么原因?
- 最近发表
- 标签列表
-
- axure 注册码 (25)
- mutex_lock (30)
- oracleclient (27)
- nfs (25)
- springbatch (28)
- oracle数据库备份 (25)
- dir (26)
- connectionstring属性尚未初始化 (23)
- output (32)
- panel滚动条 (28)
- centos 5 4 (23)
- sql学习 (33)
- c 数组 (33)
- pascal语言教程 (23)
- ppt 教程 (35)
- java7 (24)
- 自适应网站制作 (32)
- server服务自动停止 (25)
- 超链接去掉下划线 (34)
- 什么是堆栈 (22)
- map entry (25)
- ubuntu装qq (25)
- outputstreamwriter (26)
- fill_parent (22)
- mssqlserver jar (30)