海量编程文章、技术教程与实战案例

网站首页 > 技术文章 正文

flink sql方式读取kafka实时流数据

yimeika 2025-07-28 22:52:47 技术文章 5 ℃

flink sql方式读取kafka实时流数据

安装Flink

# 下载 Flink
cd /ops/app/flink
wget https://archive.apache.org/dist/flink/flink-1.19.2/flink-1.19.2-bin-scala_2.12.tgz
tar -xzf flink-1.19.2-bin-scala_2.12.tgz
mv flink-1.19.2 /ops/app/flink

# 添加依赖包
cd /ops/app/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.19/flink-connector-kafka-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar
wget https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar

环境变量

vim ~/.bashrc

export FLINK_HOME=/ops/app/flink  # 或你的实际解压路径
export PATH=$PATH:$FLINK_HOME/bin

source ~/.bashrc

启动

/ops/app/flink/bin/start-cluster.sh

验证是否运行成功, 应看到
StandaloneSessionClusterEntrypoint 进程

jps

验证flink命令

flink --version

浏览器访问

http://localhost:8081/

如果访问不上,需要检查防火墙或者端口是否被占用。还有一个可能的原因,需要修改 conf 目录下的 config.yaml 配置,将 localhost 改为 0.0.0.0。端口冲突可以修改 rest.port 配置,默认是8081

rest:
 bind-address: 0.0.0.0
 # port: 8081

sql-client

SQL 客户端旨在提供一种简单的方法来编写、调试和提交表程序到 Flink 集群,而无需任何一行 Java 或 Scala 代码。

# 启动
bin/sql-client.sh

简单查询

在客户端输入一个测试sql。

SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';

SELECT
  name,
  COUNT(*) AS cnt
FROM
  (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
GROUP BY name;

结果如下:

kafka sql connectors

进入sql-clients之后,使用 Flink SQL 创建 kafka 表和 MySQL 表

SET 'sql-client.execution.result-mode' = 'tableau';

-- 创建 Kafka 数据源表(JSON 格式)
CREATE TABLE KafkaSource (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  -- 从 Kafka 消息元数据提取时间戳
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'flink-consumer-group',
  'scan.startup.mode' = 'earliest-offset',        -- 从最早位点开始消费
  'format' = 'json'                               -- 指定 JSON 格式
);

-- 创建 MySQL Sink 表(结构参考前文)
CREATE TABLE MysqlSink (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP,
  PRIMARY KEY (`user_id`, `item_id`, `ts`) NOT ENFORCED  -- 主键需与 MySQL 表一致
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<ip>:3306/<database>?useSSL=false&serverTimezone=UTC',
  'table-name' = 'test_flink',                -- MySQL 表名
  'username' = '<username>',                -- 修改为你的数据库用户名
  'password' = '<password>',          -- 修改为你的数据库密码
  'sink.buffer-flush.interval' = '1s'           -- 写入批次间隔(可选调优)
);

创建test-topic

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

启动kafka生产者,发送消息

bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic test-topic

消息内容,json格式的数据最好做个压缩

{"user_id": 1001,"item_id": 2001,"behavior": "click"}
{"user_id": 1002,"item_id": 2002,"behavior": "select"}

sql-clients中,查询KafkaSource

SELECT user_id, item_id, behavior, ts
FROM KafkaSource;

查询结果如下

sql-clients中,将Kafka流数据插入到MysqlSink表

INSERT INTO MysqlSink
SELECT user_id, item_id, behavior, ts
FROM KafkaSource;
最近发表
标签列表