博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink的这些事(二)——Flink开发环境搭建
阅读量:1900 次
发布时间:2019-04-26

本文共 19100 字,大约阅读时间需要 63 分钟。

IEDA开发环境

1、安装java环境

参考上一篇文章

2、安装maven

参考博客

3、配置IDEA

参考博客

4、pom文件设置

4.0.0
flink
flink-dev
1.0-SNAPSHOT
1.8
1.8
UTF-8
2.11.12
2.11
2.7.6
1.6.1
org.scala-lang
scala-library
${scala.version}
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
mysql
mysql-connector-java
5.1.38
com.alibaba
fastjson
1.2.22
src/main/scala
src/test/scala
net.alchim31.maven
scala-maven-plugin
3.2.0
compile
testCompile
-dependencyfile
${project.build.directory}/.scala_dependencies
org.apache.maven.plugins
maven-surefire-plugin
2.18.1
false
true
**/*Test.*
**/*Suite.*
org.apache.maven.plugins
maven-shade-plugin
3.0.0
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
org.apache.spark.WordCount

5、代码示例

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/** * Author: qincf * Date: 2018/11/02 * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来 *       先在目标主机1.1.1.1机器上执行nc -l 9000 */public class StreamingWindowWordCount {    public static void main(String[] args) throws Exception {        //定义socket的端口号        int port;        try{            ParameterTool parameterTool = ParameterTool.fromArgs(args);            port = parameterTool.getInt("port");        }catch (Exception e){            System.err.println("没有指定port参数,使用默认值9000");            port = 9000;        }        //获取运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //连接socket获取输入的数据        DataStreamSource
text = env.socketTextStream("1.1.1.1", port, "\n"); //计算数据 DataStream
windowCount = text.flatMap(new FlatMapFunction
() { public void flatMap(String value, Collector
out) throws Exception { String[] splits = value.split("\\s"); for (String word:splits) { out.collect(new WordWithCount(word,1L)); } } })//打平操作,把每行的单词转为
类型的数据 //针对相同的word数据进行分组 .keyBy("word") //指定计算数据的窗口大小和滑动窗口大小 .timeWindow(Time.seconds(2),Time.seconds(1)) .sum("count"); //获取可视化JSON System.out.println(env.getExecutionPlan()); //把数据打印到控制台,使用一个并行度 windowCount.print().setParallelism(1); //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("streaming word count"); } /** * 主要为了存储单词以及单词出现的次数 */ public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } }}

6、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序

在主机上输入字符

[root@data01]# nc -l 9000aabcdd

此时运行程序后,IDEA中会打印处结果

E:\tools\Java\bin\java.exe "-javaagent:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\lib\idea_rt.jar=61830:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath E:\tools\Java\jre\lib\charsets.jar;E:\tools\Java\jre\lib\deploy.jar;E:\tools\Java\jre\lib\ext\access-bridge-64.jar;E:\tools\Java\jre\lib\ext\cldrdata.jar;E:\tools\Java\jre\lib\ext\dnsns.jar;E:\tools\Java\jre\lib\ext\jaccess.jar;E:\tools\Java\jre\lib\ext\jfxrt.jar;E:\tools\Java\jre\lib\ext\localedata.jar;E:\tools\Java\jre\lib\ext\nashorn.jar;E:\tools\Java\jre\lib\ext\sunec.jar;E:\tools\Java\jre\lib\ext\sunjce_provider.jar;E:\tools\Java\jre\lib\ext\sunmscapi.jar;E:\tools\Java\jre\lib\ext\sunpkcs11.jar;E:\tools\Java\jre\lib\ext\zipfs.jar;E:\tools\Java\jre\lib\javaws.jar;E:\tools\Java\jre\lib\jce.jar;E:\tools\Java\jre\lib\jfr.jar;E:\tools\Java\jre\lib\jfxswt.jar;E:\tools\Java\jre\lib\jsse.jar;E:\tools\Java\jre\lib\management-agent.jar;E:\tools\Java\jre\lib\plugin.jar;E:\tools\Java\jre\lib\resources.jar;E:\tools\Java\jre\lib\rt.jar;E:\code\flink\target\classes;E:\tools\Maven-Repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;E:\tools\Maven-Repository\org\apache\flink\flink-java\1.6.1\flink-java-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-core\1.6.1\flink-core-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-annotations\1.6.1\flink-annotations-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-metrics-core\1.6.1\flink-metrics-core-1.6.1.jar;E:\tools\Maven-Repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\tools\Maven-Repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\tools\Maven-Repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\tools\Maven-Repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;E:\tools\Maven-Repository\org\tukaani\xz\1.0\xz-1.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-asm\5.0.4-4.0\flink-shaded-asm-5.0.4-4.0.jar;E:\tools\Maven-Repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;E:\tools\Maven-Repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\tools\Maven-Repository\org\apache\flink\force-shading\1.6.1\force-shading-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-java_2.11\1.6.1\flink-streaming-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-runtime_2.11\1.6.1\flink-runtime_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.6.1\flink-queryable-state-client-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-hadoop-fs\1.6.1\flink-hadoop-fs-1.6.1.jar;E:\tools\Maven-Repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-4.0\flink-shaded-netty-4.1.24.Final-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-jackson\2.7.9-4.0\flink-shaded-jackson-2.7.9-4.0.jar;E:\tools\Maven-Repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\config\1.3.0\config-1.3.0.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;E:\tools\Maven-Repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;E:\tools\Maven-Repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\tools\Maven-Repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;E:\tools\Maven-Repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;E:\tools\Maven-Repository\com\twitter\chill-java\0.7.4\chill-java-0.7.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-guava\18.0-4.0\flink-shaded-guava-18.0-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-scala_2.11\1.6.1\flink-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-scala_2.11\1.6.1\flink-streaming-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-table_2.11\1.6.1\flink-table_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-clients_2.11\1.6.1\flink-clients_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-optimizer_2.11\1.6.1\flink-optimizer_2.11-1.6.1.jar;E:\tools\Maven-Repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.6.1\flink-connector-kafka-0.10_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.6.1\flink-connector-kafka-0.9_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-base_2.11\1.6.1\flink-connector-kafka-base_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;E:\tools\Maven-Repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-client\2.7.6\hadoop-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-common\2.7.6\hadoop-common-2.7.6.jar;E:\tools\Maven-Repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;E:\tools\Maven-Repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;E:\tools\Maven-Repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;E:\tools\Maven-Repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;E:\tools\Maven-Repository\commons-net\commons-net\3.1\commons-net-3.1.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-sslengine\6.1.26\jetty-sslengine-6.1.26.jar;E:\tools\Maven-Repository\javax\servlet\jsp\jsp-api\2.1\jsp-api-2.1.jar;E:\tools\Maven-Repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\tools\Maven-Repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;E:\tools\Maven-Repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;E:\tools\Maven-Repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;E:\tools\Maven-Repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-log4j12\1.7.10\slf4j-log4j12-1.7.10.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;E:\tools\Maven-Repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;E:\tools\Maven-Repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;E:\tools\Maven-Repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;E:\tools\Maven-Repository\com\google\code\gson\gson\2.2.4\gson-2.2.4.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-auth\2.7.6\hadoop-auth-2.7.6.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpclient\4.2.5\httpclient-4.2.5.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpcore\4.2.4\httpcore-4.2.4.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-kerberos-codec\2.0.0-M15\apacheds-kerberos-codec-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-i18n\2.0.0-M15\apacheds-i18n-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-asn1-api\1.0.0-M20\api-asn1-api-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-util\1.0.0-M20\api-util-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\curator\curator-framework\2.7.1\curator-framework-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-client\2.7.1\curator-client-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-recipes\2.7.1\curator-recipes-2.7.1.jar;E:\tools\Maven-Repository\org\apache\htrace\htrace-core\3.1.0-incubating\htrace-core-3.1.0-incubating.jar;E:\tools\Maven-Repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-hdfs\2.7.6\hadoop-hdfs-2.7.6.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\tools\Maven-Repository\io\netty\netty\3.6.2.Final\netty-3.6.2.Final.jar;E:\tools\Maven-Repository\io\netty\netty-all\4.0.23.Final\netty-all-4.0.23.Final.jar;E:\tools\Maven-Repository\xerces\xercesImpl\2.9.1\xercesImpl-2.9.1.jar;E:\tools\Maven-Repository\xml-apis\xml-apis\1.3.04\xml-apis-1.3.04.jar;E:\tools\Maven-Repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.7.6\hadoop-mapreduce-client-app-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.7.6\hadoop-mapreduce-client-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-client\2.7.6\hadoop-yarn-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-server-common\2.7.6\hadoop-yarn-server-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.7.6\hadoop-mapreduce-client-shuffle-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-api\2.7.6\hadoop-yarn-api-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.7.6\hadoop-mapreduce-client-core-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-common\2.7.6\hadoop-yarn-common-2.7.6.jar;E:\tools\Maven-Repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;E:\tools\Maven-Repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;E:\tools\Maven-Repository\javax\activation\activation\1.1\activation-1.1.jar;E:\tools\Maven-Repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-client\1.9\jersey-client-1.9.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-jaxrs\1.9.13\jackson-jaxrs-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-xc\1.9.13\jackson-xc-1.9.13.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.7.6\hadoop-mapreduce-client-jobclient-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-annotations\2.7.6\hadoop-annotations-2.7.6.jar;E:\tools\Maven-Repository\mysql\mysql-connector-java\5.1.38\mysql-connector-java-5.1.38.jar;E:\tools\Maven-Repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar StreamingWindowWordCount没有指定port参数,使用默认值9000{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]}]}WordWithCount{word='a', count=1}WordWithCount{word='a', count=2}WordWithCount{word='b', count=1}WordWithCount{word='d', count=1}WordWithCount{word='c', count=1}WordWithCount{word='c', count=1}WordWithCount{word='a', count=1}WordWithCount{word='d', count=1}WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。

仔细看还有一串json输出,这部分是什么呢?
代码中加了一个打印执行计划的部分:

/获取可视化JSONSystem.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到可以看到执行计划图:

4579636-fb04d8830a79c07e.png
image.png

转载地址:http://lnucf.baihongyu.com/

你可能感兴趣的文章
vba遍历指定的文件夹
查看>>
NPOI遍历excel表格
查看>>
不能在 64 位 SQL Server 上在进程中加载 32 位 OLE DB 访问接口“Microsoft.ACE.OLEDB.12.0”。...
查看>>
sqlserver建立连接服务器
查看>>
Content-Security-Policy
查看>>
apple-mobile-web-app-capable
查看>>
C# byte换算
查看>>
tsql字符串截取
查看>>
C#中的abstract、virtual、interface关键字
查看>>
.Net面试题
查看>>
ABS
查看>>
c# List<string>和List<int>互相转换
查看>>
C#分页的总页数算法
查看>>
sqlserver 表连接更新字段
查看>>
jquery.formatDateTime
查看>>
ITIL(Information Technology Infrastructure Library )
查看>>
jquery disabled
查看>>
同一个项目中使用MVC控制器和WebAPI控制器
查看>>
WebApi返回json
查看>>
easyui带file上传控件表达提交
查看>>