0%

原文链接

什么是状态

当在一个数据流中,很多算子看起来都是每次只独立处理一个事件的时候(比如event parser),有一些算子能够通过其他事件记住一些信息(比如窗口操作相关的算子)
,这些算子就是有状态的
下面是一些例子

  • 当一个应用检索确定的事件模式时,Flink状态会把所有已发生的事件存储到一个队列中
  • 当按 天/时/分聚合事件时,Flink的状态会保存那些等待聚合的事件
  • 当使用流式数据训练机器学习模型时,Flink状态会存储当前模型版本的参数
  • 当需要管理历史数据的时候,Flink状态能提供高效的历史的数据访问

Flink 需要感知状态,这样才能使用CheckPoint 和SavePoint 进行容错处理

基本命令

1
2
3
4
hexo g: 重新生成
hexo clean: 清理信息
hexo d: 部署到git,千万不要用push
hexo new xxx:创建一篇新的文章

前言

运行Flink需要环境,为了快速开始编码学习,我们直接使用Flink本地部署+IDE搭建开发环境

  1. 搭建Flink环境
  2. 开发&运行Flink程序

Idea 直接运行式

  1. 下载Flink
  2. 创建项目
    1
    2
    3
    4
    5
    6
    7
    8
    9
    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.12.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false
  3. 用Idea 打开创建好的项目
  4. 在Idea 打开配置File-Project Structure-Libraries-±–java,添加第一步解压出来的Flink包里的lib 和 opt 包
  5. 随便写点啥,运行吧,然后就能在控制台打印出来了
  6. 特别注意
     1. 不要引入Spring Boot,不要仗着Idea是高级版就无脑创建Spring Boot项目
     2. 没有主包的,create 一个,然后mark一下
    

提交jar式

搭建运行环境

搭建本地Flink 集群

  1. 创建项目

    1
    2
    3
    4
    5
    6
    7
    8
    9
    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.12.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false
  2. 使用Idea 打开

  3. 配置模块下的pom 打包插件如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <configuration>
    <archive>
    <manifest>
    <!--设置程序的入口类-->
    <mainClass>aaa.bbb.ccc</mainClass>
    <addClasspath>true</addClasspath>
    <classpathPrefix>lib/</classpathPrefix>
    </manifest>
    </archive>
    <classesDirectory>
    </classesDirectory>
    </configuration>
    </plugin>
    </plugins>
    </build>
  4. 提交到Flink

    1
    /Users/lonie/open_source/flink-1.12.1/bin/flink run  target/demo-0.0.1-SNAPSHOT.jar
  5. 查看TaskManager 的日志即可