本文共 18507 字,大约阅读时间需要 61 分钟。
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mapimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements("c a b d a c","d c a b c d") val dataSet2 = dataSet.map(_.toUpperCase + "字符串连接") dataSet2.print() }}
C A B D A C字符串连接D C A B C D字符串连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmapimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements("c a b d a c","d c a b c d") val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")) dataSet2.print() }}
CABDACDCABCD
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filterimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * filter 过滤器,对数据进行过滤处理 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements("c a b d a c","d c a b c d") val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty) dataSet2.print() }}
CABDACDCABCD
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mappackage com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于进行所有元素的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(3,5,8,9) // 3 + 5 + 8 + 9 val dataSet2 = dataSet.reduce((a,b) => { println(s"${a} + ${b} = ${a +b}") a + b }) dataSet2.print() }}
3 + 5 = 88 + 8 = 1616 + 9 = 2525
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object ReduceGroupRun2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.groupBy(0).reduce((x,y) => { (x._1,x._2 + y._2) } ) dataSet2.print() }}
(d,1)(a,2)(f,2)(b,1)(c,2)(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFieldsimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object ReduceGroupRun { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements("a","b","c","a","c","d","f","g","f") /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count)) dataSet2.print() } case class WordCount(word:String,count:Int)}
WordCount(d,1)WordCount(a,2)WordCount(f,2)WordCount(b,1)WordCount(c,2)WordCount(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelectorimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object ReduceGroupRun { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements("a","b","c","a","c","d","f","g","f") /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2)) dataSet2.print() }}
WordCount(d,1)WordCount(a,2)WordCount(f,2)WordCount(b,1)WordCount(c,2)WordCount(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.util.Collector/** * 相同的key的元素,都一次做为参数传进来了 */object ReduceGroupRun { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val dataSet = env.fromElements("a","a","c","b","a") /** * 中间数据 * (a,1) * (a,1) * (c,1) * (b,1) * (a,1) */ val result = dataSet.map((_,1)).groupBy(0).reduceGroup( (in, out: Collector[(String,Int)]) =>{ var count = 0 ; var word = ""; while (in.hasNext){ val next = in.next() word = next._1 count = count + next._2 } out.collect((word,count)) } ) result.print() }}
(a,3)(b,1)(c,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.util.Collector/** * 相同的key的元素,都一次做为参数传进来了 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val dataSet = env.fromElements("a","a","c","b","a") /** * 中间数据 * (a,1) * (a,1) * (c,1) * (b,1) * (a,1) */ val result = dataSet.map((_,1)).groupBy(0).combineGroup( (in, out: Collector[(String,Int)]) =>{ var count = 0 ; var word = ""; while (in.hasNext){ val next = in.next() word = next._1 count = count + next._2 } out.collect((word,count)) } ) result.print() }}
(a,3)(b,1)(c,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.sum(1) dataSet2.print() }}
(f,15)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.max(1) dataSet2.print() }}
(f,5)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.min(1) dataSet2.print() }}
(f,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.groupBy(0).sum(1) dataSet2.print() }}
(d,1)(a,2)(f,2)(b,1)(c,2)(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.groupBy(0).max(1) dataSet2.print() }}
(d,1)(a,2)(f,1)(b,1)(c,4)(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.groupBy(0).min(1) dataSet2.print() }}
(d,1)(a,1)(f,1)(b,1)(c,1)(g,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/** * 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作 */object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) /** * (a,1) * (b,1) * (c,1) * (a,1) * (c,1) * (d,1) * (f,1) * (g,1) */ val dataSet2 = dataSet.distinct(1) dataSet2.print() }}
(a,3)(b,1)(c,5)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0) dataSet3.print() }}
((d,1),(d,1))((f,1),(f,1))((f,1),(f,1))((f,1),(f,1))((f,1),(f,1))((g,1),(g,1))
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5)) val dataSet2 = env.fromElements(("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){ (x,y) => (x._1,x._2+ y._2) } dataSet3.print() }}
(f,3)(g,6)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5)) val dataSet2 = env.fromElements(("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){ (x,y) => { var count = 0; if(y != null ){ count = y._2 } (x._1,x._2+ count) } } dataSet3.print() }}
(d,1)(a,3)(a,1)(f,3)(b,1)(c,5)(c,1)(g,6)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5)) val dataSet2 = env.fromElements(("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){ (x,y) => { var count = 0; if(x != null ){ count = x._2 } (x._1,y._2 + count) } } dataSet3.print() }}
(f,2)(g,2)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5)) val dataSet2 = env.fromElements(("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){ (x,y) => { var countY = 0; if(y != null ){ countY = y._2 } var countX = 0; if(x != null ){ countX = x._2 } (x._1,countX + countY) } } dataSet3.print() }}
(f,2)(g,2)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",1),("g",1),("f",1)) val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.union(dataSet2) dataSet3.print() }}
(a,1)(d,1)(g,1)(f,1)(f,1)(g,1)(f,1)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.firstimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.first(3) dataSet3.print() }}
(a,3)(b,1)(c,5)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroupimport java.langimport org.apache.flink.api.common.functions.CoGroupFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.util.Collectorobject Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",1),("g",1),("a",1)) val dataSet2 = env.fromElements(("a",1),("f",1)) //全外连接 val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0) { new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] { override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = { println("==============开始") println("first") println(first) val iteratorFirst = first.iterator() while (iteratorFirst.hasNext()){ println(iteratorFirst.next()) } println("second") println(second) val iteratorSecond = second.iterator() while (iteratorSecond.hasNext()){ println(iteratorSecond.next()) } println("==============结束") } } } dataSet3.print() }}
==============开始firstorg.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0(a,1)(a,1)secondorg.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2(a,1)==============结束==============开始firstorg.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a(g,1)second[]==============结束==============开始first[]secondorg.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5(f,1)==============结束Process finished with exit code 0
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.crossimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",1),("g",1),("f",1)) val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1)) //全外连接 val dataSet3 = dataSet.cross(dataSet2) dataSet3.print() }}
((a,1),(d,1))((a,1),(f,1))((a,1),(g,1))((a,1),(f,1))((g,1),(d,1))((g,1),(f,1))((g,1),(g,1))((g,1),(f,1))((f,1),(d,1))((f,1),(f,1))((f,1),(g,1))((f,1),(f,1))
转载地址:http://auoso.baihongyu.com/