windows
下载
要想运行flink,前提必须安装 JDK1.8x 官网下载地址:
https://flink.apache.org/zh/downloads.html
。或者百度云下载:链接 提取码:jw3e
解压
直接解压下载下来的文件(以网盘下载的为例),然后重命名为 flink 即可。
启动
直接进入 ${flink-home}/bin
目录,双击 start-cluster.bat
即启动了。
访问
在浏览器中输入:http://localhost:8081/
即可访问。
命令查看运行状态
因为Flink依赖于Java环境,所以可以通过查看 JVM 的进程,来查看所有使用 Java 环境的进程: 输入 jps
即可。
linux
部署任务
方式一,dashboard 上传jar包
8081界面,上传jar包后,点击该任务后,输入相关的运行参数配置,如果看到任务直接失败了,那八成原因是当前任务所需的可用 task Slots
不够,那怎么办呢?很简单,修改配置文件。进入 flink_home 目录,编辑文件 vim conf/flink-conf.yaml
,将 taskmanager.numberOfTaskSlots
的值设置大一点,比如4,然后再重新上传jar包,启动任务即可。
问题:那么
taskmanager.numberOfTaskSlots
设置该遵循什么原则呢?或者说怎么设置比较好呢?
方式二,通过手动输入命令启动任务
./bin/flink run -c com.example.wc.WordCountDataStream -p 3 /opt/apps/project/flink/flink-quickstart-1.0-SNAPSHOT.jar --host 192.168.80.100 --port 7777
-c:表示运行哪一个类;-p:表示并行度(parallelism);一定要手动指明jar包的位置。
查看所有提交的Job
./bin/flink list -a
取消某个Job
./bin/flink cancel 7c0c4254b1e25f0473ce4a9199287744
Flink部署
Standalone模式
Yarn模式
以yarn模式部署Flink任务时,要求Flink是有Hadoop支持的版本,Hadoop环境需要保证版本在2.2以上,并且集群中安装有HDFS服务。
Kubernetes部署
运行demo
flink提供两种任务运行入口:
flink.bat
脚本启动任务程序- 页面上传任务程序包运行
通过脚本启动任务
flink.bat run ../example/batch/WordCount.jar
通过页面上传任务包运行
上传任务程序包
运行任务程序包
任务管理监控
JobManager
①JobManager控制一个应用程序执行的主进程,每个应用程序都会被一个不同的JobManager所控制执行, ②JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraphy),逻辑数据流图,和打包了所有的类库和其它资源的JAR包。 ③JobManager会把JobGraphy转换成一个物理层面的数据流图,这个图被叫做 执行图,包含了所有可以并发执行的任务。 ④JobManager会向 resourceManager 申请执行任务所必要的资源,即 taskManager 上的 slot(插槽)。一旦它获取到了足够的资源,就会将执行图分发到真正运行他们的 TaskManager上。(也就是说真正执行任务的是谁?是taskManager)。而在运行过程中,JobManager会负责所有需要中央协调资源的操作,比如说检查点(checkPoint)的协调。
TaskManager
- Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
- 启动之后,TaskManager会向资源管理器注册它的插槽,收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
- 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
任务提交流程
思考
- 怎样实现并行计算?设置并行任务,分配到slot上执行,不同的任务,不同的slot执行,多线程执行。
- 并行的任务,需要占用多少slot?跟当前所有任务中最大任务的并行度相关。
- 一个流处理程序,到底包含多少个任务?
并行度(Parallelism)
- 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。 一般情况下,一个Stream的并行度,可以认为就是其所有算子中最大的并行度。
FLink 读取、写入 MySQL 数据
下面这个示例实现的功能:
- 生成随机的用户数据(姓名,昵称,密码,邮箱等字段),并保存到 user.txt 文本中
- 采用Flink,读取该txt文本文件作为数据输入流,处理映射,将数据保存到 MySQL 数据表中
- 通过Flink,从 MySQL 中再读取该表,将数据打印输出在控制台。
其中,会用到几个工具类,我都放到下面了。
主任务 FLink2Mysql
package com.example.tool;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author 030
* @date 10:22 2021/11/20
* @description
*/
public class Flink2Mysql {
/*基本属性*/
private final static String username = "root";
private final static String password = "123456";
private final static String driverName = "com.mysql.cj.jdbc.Driver";
private final static String dbURL = "jdbc:mysql://192.168.80.100:3306/flink-test";
// 文件
private final static String filePath = "E:\\github-project\\flink-quickstart\\src\\main\\resources\\user.txt";
@Test
public void test() throws Exception {
// 1. 写入随机数据到 txt 中
saveToTxt();
// 2. 将 txt 数据写入到 MySQL 中
writeToDB();
// 从 MySQL 中读取 user 数据
readDBToFlink();
}
/**
* 从 MySQL 中读取 user 数据到 Flink 中
*/
private static void readDBToFlink() throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 预定义数据库字段
TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.LONG_TYPE_INFO, // 主键id是bigint,对应的Java数据类型是Long
BasicTypeInfo.STRING_TYPE_INFO, // username
BasicTypeInfo.STRING_TYPE_INFO, // nickname
BasicTypeInfo.STRING_TYPE_INFO, // password
BasicTypeInfo.INT_TYPE_INFO, // status
BasicTypeInfo.STRING_TYPE_INFO, // email
BasicTypeInfo.STRING_TYPE_INFO, // userface
BasicTypeInfo.STRING_TYPE_INFO // regTime
};
// 构造 RowTypeInfo
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// 从table创建一个输入数据流
DataSet<Row> readTable = env.createInput(
JDBCInputFormat
.buildJDBCInputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL)
.setUsername(username)
.setPassword(password)
.setQuery("select * from user")
.setRowTypeInfo(rowTypeInfo)
.finish()
);
// 读取时提供的自定义实现MapFunction
DataSet<Tuple8<Long, String, String, String, Integer, String, String, String>> dataSet = readTable.map(new MyMapFunctionReadImpl());
// 输出
System.out.println("\033[32;4m" + "===================================从MySQL中读取数据 start=======================================" + "\033[0m");
dataSet.print();
System.out.println("\033[32;4m" + "=====================================从MySQL中读取数据 end==========================================" + "\033[0m");
}
/**
* 从文件中读取数据并写入数据到 MySQL 中
*/
private static void writeToDB() throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/*
* 这里有一个巨坑:当使用 readCsvFile()读取文件时,csv文件中的数据默认使用逗号作为分隔符,并且分隔符之后不可以有空格
* 由上可知,不管你的 1.txt 文件中使用什么作为分隔符都无所谓,但在分隔符之后必须不能有空格
* */
DataSet<Row> write = env
.readCsvFile(filePath)
.fieldDelimiter(",")
.types(String.class, String.class, String.class, Integer.class, String.class, String.class, String.class) // 打散txt数据流之后有7个属性
.map(new MyMapFunctionWriteImpl()); // 写入时提供的自定义实现MapFunction
// 写入 db
write.output(
JDBCOutputFormat
.buildJDBCOutputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL)
.setUsername(username)
.setPassword(password)
.setQuery("insert into user(username, nickname, password, status, email, userface, regTime) values (?, ?, ?, ?, ?, ?, ?)")
.finish()
);
System.out.println("\033[32;4m" + "------------------------------------------从txt文本写入数据到MySQL中 start---------------------------" + "\033[0m");
// 执行
env.execute();
System.out.println("\033[32;4m" + "---------------------------------------------从txt文本写入数据到MySQL中 end------------------------------" + "\033[0m");
}
/**
* 随机生成汉字,输出到文件 1.txt 中
*/
private static void saveToTxt() throws Exception {
// 默认生成 3 位吧
String username, nickname, password, email, userface, regTime, status;
FileOutputStream fis = new FileOutputStream(new File(filePath));
// 就写 50 行吧
for (int i = 0; i < 50; i++) {
// username 3位名字
username = RandomUserDataUtil.getChineseName() + ",";
// nickname 5位汉字
nickname = ChineseTool.getRandomChinese(5) + ",";
// password 密码长度设置 12 位
password = PWDTool.getRandomPwd(12) + ",";
// status
status = "1" + ",";
// email
email = RandomUserDataUtil.getEmail(9, 12) + ",";
// userface 以随机图片为例
userface = RandomUserDataUtil.getImg() + ",";
// regTime
SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日");
regTime = sdf.format(new Date());
// 写入数据到 txt文件,注意写入顺序必须和SQL中的字段顺序一致,id 是自增长的,不需要生成
// id username nickname password status email userface regTime
fis.write(username.getBytes());
fis.write(nickname.getBytes());
fis.write(password.getBytes());
fis.write(status.getBytes());
fis.write(email.getBytes());
fis.write(userface.getBytes());
fis.write(regTime.getBytes());
// 写完一行后,写入换行符 参考: https://blog.51cto.com/u_11585002/2713634
// String newLine = System.getProperty("line.separator");
// fis.write(newLine.getBytes());
fis.write("\r\n".getBytes());
}
fis.close();
}
/**
* 自定义实现 MapFunction 的业务逻辑
* 注意,函数式接口的用法,第一个参数 是输入类型,第二个参数是输出类型,即返回类型值
*/
private static class MyMapFunctionWriteImpl implements MapFunction<Tuple7<String, String, String, Integer, String, String, String>, Row> {
// 输入 Tuple2<String, Integer>, 输出 Row
// 由于我们这里的 UserData 总共有8个字段,但是id字段是自增长的,因此在写入MySQL时只需要7元组
// int id, String username, nickname, password, email, userface, regTime, int status;
// 对应的7元组分别是:必须和sql创建时的字段顺序一致
// String, String, String, String, Integer, String, String
@Override
public Row map(Tuple7<String, String, String, Integer, String, String, String> value) {
Row row = new Row(7);
row.setField(0, value.f0.getBytes(StandardCharsets.UTF_8));
row.setField(1, value.f1.getBytes(StandardCharsets.UTF_8));
row.setField(2, value.f2.getBytes(StandardCharsets.UTF_8));
row.setField(3, value.f3.byteValue());
row.setField(4, value.f4.getBytes(StandardCharsets.UTF_8));
row.setField(5, value.f5.getBytes(StandardCharsets.UTF_8));
row.setField(6, value.f6.getBytes(StandardCharsets.UTF_8));
return row;
}
}
/**
* 自定义实现 MapFunction 的业务逻辑
*/
private static class MyMapFunctionReadImpl implements MapFunction<Row, Tuple8<Long, String, String, String, Integer, String, String, String>> {
// 在从MySQL中读取数据时,User 表里面总共是8个字段,这时就是一个8元组,需要将主键id字段加上
// 对应的8元组分别是:注意这个对应顺序关系必须和数据库一样(即第一个什么数据类型,第二个什么类型等)
// Long, String, String, String, Integer, String, String db和java数据对应类型参看 Alibaba Java开发手册
@Override
public Tuple8<Long, String, String, String, Integer, String, String, String> map(Row row) {
Long id = Long.valueOf(row.getField(0).toString()); // id
String username = row.getField(1).toString(); // username
String nickname = row.getField(2).toString(); // nickname
String password = row.getField(3).toString(); // password
Integer status = Integer.valueOf(row.getField(4).toString()); // status
String email = row.getField(5).toString(); // email
String userface = row.getField(6).toString(); // userface
String regTime = row.getField(7).toString(); // regTime
// 封装读取的返回结果
return new Tuple8<>(id, username, nickname, password, status, email, userface, regTime);
}
}
}
需要注意的点,都在代码中有标注。
随机密码工具类
package com.example.tool;
import org.junit.Test;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* @author 030
* @date 10:56 2021/11/20
* @description
*/
public class PWDTool {
// 不允许创建实例对象
private PWDTool() {
}
/**
* 生成随机密码工具类
* 密码要求:
* 必须要包含大写字符、小写字符、数字、特殊符号,密码长度在 8-20 位。
*/
private static final String lowStr = "abcdefghijklmnopqrstuvwxyz";
private static final String specialStr = "~!@#$%^&*()_+/-=[]{};:'<>?.";
private static final String numStr = "0123456789";
// 指定长度,随机生成复杂密码
/**
* 提供对外的接口:生成指定长度的复杂密码
*
* @param pwdLength
* @return
*/
public static String getRandomPwd(int pwdLength) {
if (pwdLength > 20 || pwdLength < 8) {
System.out.println("密码长度不满足要求");
return "";
}
// 先满足必须条件:大写字符,小写字符,数字,特殊符号各一个
List<Character> list = new ArrayList<>(pwdLength);
list.add(getLowChar());
list.add(getUpperChar());
list.add(getNumChar());
list.add(getSpecialChar());
// 从4-密码位数,随机生成
for (int i = 4; i < pwdLength; i++) {
SecureRandom random = new SecureRandom();
// 产生0-4之间的随机int值(包含0,不包含4)
int funNum = random.nextInt(4);
list.add(getRandomChar(funNum));
}
// 打乱字符排列顺序
Collections.shuffle(list);
// 将 List 转为 String 字符串输出
StringBuilder stringBuilder = new StringBuilder(list.size());
for (Character c : list) {
stringBuilder.append(c);
}
return stringBuilder.toString();
}
// 随机获取字符串字符
private static char getRandomChar(String str) {
SecureRandom random = new SecureRandom();
return str.charAt(random.nextInt(str.length()));
}
// 随机获取小写字符
private static char getLowChar() {
return getRandomChar(lowStr);
}
// 随机获取大写字符
private static char getUpperChar() {
return Character.toUpperCase(getLowChar());
}
// 随机获取数字字符
private static char getNumChar() {
return getRandomChar(numStr);
}
// 随机获取特殊字符
private static char getSpecialChar() {
return getRandomChar(specialStr);
}
//指定调用字符函数
private static char getRandomChar(int funNum) {
switch (funNum) {
case 0:
return getLowChar();
case 1:
return getUpperChar();
case 2:
return getNumChar();
default:
return getSpecialChar();
}
}
// 测试
@Test
public void pwdTest() {
for (int i = 0; i < 10; i++) {
System.out.println(getRandomPwd(10));
}
}
}
生成随机用户数据工具类
package com.example.tool;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.Test;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 参考:代码先锋网: https://www.codeleading.com/article/38615255660/
* @author 030
* @date 11:14 2021/11/20
* @description 随机中文姓, 名
*/
public class RandomUserDataUtil {
//百家姓
private final static String[] firstNames = {
"赵", "钱", "孙", "李", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许",
"何", "吕", "施", "张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎",
"鲁", "韦", "昌", "马", "苗", "凤", "花", "方", "俞", "任", "袁", "柳", "酆", "鲍", "史", "唐", "费", "廉", "岑", "薛", "雷", "贺", "倪", "汤", "滕", "殷",
"罗", "毕", "