ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark Transformation 算子

2019-02-15 11:41:19  阅读:265  来源: 互联网

标签:val ._ class rdd Tuple2 算子 new Spark Transformation


Java版
package com.huanfion.Spark;
 
import com.sun.tools.internal.ws.processor.model.java.JavaParameter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
 
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
 
public class TransformationJava {
    public static void main(String[] args) {
        //map();
//        filter();
        //flatMap();
//        groupByKey();
//        reduceByKey();
//        sortByKey();
//        join();
        cogroup();
    }
    public static void cogroup()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(2,88),
                new Tuple2<>(2,97),
                new Tuple2<>(3,90),
                new Tuple2<>(3,50),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);
 
        JavaPairRDD<Integer,Tuple2<Iterable,Iterable>> cogroupvalue=sturdd.cogroup(scorerdd);
 
        cogroupvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable, Iterable>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable, Iterable>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void join()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(3,90),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);
 
        JavaPairRDD<Integer,Tuple2<String,Integer>> joinvalue=sturdd.join(scorerdd);
 
        joinvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void sortByKey(){
        List list=Arrays.asList(
                new Tuple2<>(91,"liuda"),
                new Tuple2<>(78,"lier"),
                new Tuple2<>(99,"zhangsan"),
                new Tuple2<>(76,"gousi"),
                new Tuple2<>(90,"lily"),
                new Tuple2<>(89,"lucy"));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<Integer,String> sortvalue=rdd.sortByKey(false);
        sortvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void reduceByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",91),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<String,Iterable> reducevalues=rdd.reduceByKey(new Function2<Integer, Integer,Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        reducevalues.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void groupByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",90),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
 
        JavaPairRDD<String,Iterable> groupvalue=rdd.groupByKey();
        groupvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void flatMap(){
        List list=Arrays.asList("Hadoop Hive","Hadoop Hbase");
        JavaRDD rdd=getsc().parallelize(list);
        JavaRDD flatMapValue=rdd.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public Iterator call(String value) throws Exception {
                return Arrays.asList(value.split(" ")).iterator();
            }
        });
        flatMapValue.foreach(x->System.out.println(x));
    }
    public static void map(){
        JavaSparkContext sc=getsc();
 
        List list= Arrays.asList(1,2,3,4);
 
        JavaRDD rdd=sc.parallelize(list);
 
        JavaRDD count=rdd.map(new Function<Integer,Integer>() {
            @Override
            public Integer call(Integer value) throws Exception {
                return value * 10;
            }
        });
        count.foreach(x->System.out.println(x));
    }
 
    public static void filter(){
        JavaSparkContext sc=getsc();
 
        List list= Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 
        JavaRDD rdd=sc.parallelize(list);
 
        JavaRDD filterValue=rdd.filter(x->(Integer) x%2==0);
 
        filterValue.foreach(x->System.out.println(x));
    }
 
    public static JavaSparkContext getsc()
    {
        SparkConf sparkconf=new SparkConf().setAppName("Transformation").setMaster("local");
 
        JavaSparkContext sc=new JavaSparkContext(sparkconf);
 
        return sc;
    }
}

 

    Scala版本
package com.huanfion.Spark
 
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
 
object TransformationScala {
  def getsc():SparkContext={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")
 
    val sc = new SparkContext(sparkconf)
 
    sc
  }
  def main(args: Array[String]): Unit = {
    //filter()
    //flatMap()
//    groupByKey()
//    reduceByKey()
//    sortByKey()
    join()
//    cogroup()
  }
 
  def filter():Unit={
   val sc=getsc()
 
    val list= Array(1,2,3,4,5,6,7,8,9,10)
 
    val rdd=sc.parallelize(list)
 
    val count=rdd.filter(x=>x%2==0)
 
    count.foreach(x=>System.out.println(x))
  }
  def map():Unit={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")
 
    val sc = new SparkContext(sparkconf)
 
    val list= Array(1,2,3,4,5)
 
    val rdd=sc.parallelize(list)
 
    val count=rdd.map(x=>x*10)
 
    count.foreach(x=>System.out.println(x))
  }
 
  def flatMap():Unit={
    val list=Array("Hadoop Hive", "Hadoop Hbase")
 
    val rdd=getsc().parallelize(list)
 
    val flatmapvalue=rdd.flatMap(x=>x.split(" "))
 
    flatmapvalue.foreach(x=>System.out.println(x))
  }
 
  def groupByKey()= {
    val list=Array(
        Tuple2("class_1",90),
       Tuple2("class_2",78),
       Tuple2("class_1",99),
       Tuple2("class_2",76),
       Tuple2("class_2",90),
       Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)
 
    val groupvalue=rdd.groupByKey()
 
    groupvalue.foreach(x=>{
      System.out.println(x._1)
      x._2.foreach(y=>System.out.println(y))
    })
  }
 
  def reduceByKey()={
    val list=Array(
      Tuple2("class_1",90),
      Tuple2("class_2",78),
      Tuple2("class_1",99),
      Tuple2("class_2",76),
      Tuple2("class_2",90),
      Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)
 
    val reducevalue=rdd.reduceByKey(_+_)
 
    reducevalue.foreach(x=>System.out.println(x._1+"--"+x._2))
  }
 
  def sortByKey()={
    val list=Array(
      Tuple2("liuda",90),
      Tuple2("lier",78),
      Tuple2("zhangsan",99),
      Tuple2("gousi",76),
      Tuple2("lily",90),
      Tuple2("lucy",86))
    val rdd=getsc().parallelize(list)
 
    val sortvalue=rdd.sortBy(x=>x._2,false)
 
    sortvalue.foreach(x=>System.out.println(x._1+":"+x._2))
  }
  def join()={
    val stulist=Array(
       Tuple2(1,"liuda"),
       Tuple2(2,"lier"),
       Tuple2(3,"zhangsan"),
       Tuple2(4,"gousi"),
       Tuple2(5,"lily"),
       Tuple2(6,"lucy"));
    val scorelist=Array(
       Tuple2(1,88),
       Tuple2(2,87),
       Tuple2(3,90),
       Tuple2(4,100),
       Tuple2(5,58),
       Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)
 
    val scorerdd=sc.parallelize(scorelist)
 
    val joinvalue=sturdd.join(scorerdd)
 
    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2))
  }
 
  def cogroup()={
    val stulist=Array(
      Tuple2(1,"liuda"),
      Tuple2(2,"lier"),
      Tuple2(3,"zhangsan"),
      Tuple2(4,"gousi"),
      Tuple2(5,"lily"),
      Tuple2(6,"lucy"));
    val scorelist=Array(
      Tuple2(1,88),
      Tuple2(2,87),
      Tuple2(2,84),
      Tuple2(2,86),
      Tuple2(3,90),
      Tuple2(3,65),
      Tuple2(4,100),
      Tuple2(5,58),
      Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)
 
    val scorerdd=sc.parallelize(scorelist)
 
    val joinvalue=sturdd.cogroup(scorerdd)
 
    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2._1.toList+":"+x._2._2.toList))
  }
}

 

 

标签:val,._,class,rdd,Tuple2,算子,new,Spark,Transformation
来源: https://www.cnblogs.com/huanfion/p/10382738.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有