标签:val spark2 dataFrame dataSet session apache import spark id
文章目录
dataFrame
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SQLTest01") .master("local[*]") .getOrCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
dataFrame wordCount
package sql2 import org.apache.spark.sql.SparkSession object Spark2WoldCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Spark2WoldCount") .master("local[*]") .getOrCreate() val lines = spark.read.textFile("") //整理数据,压平.导入隐式转换 import spark.implicits._ val words = lines.flatMap(_.split(" ")) //注册视图 words.createTempView("v_wc") //执行sql val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC") result.show() spark.stop() } }
基于dataSet的wordCount
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SQLTest01") .master("local[*]") .getOrCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
标签:val,spark2,dataFrame,dataSet,session,apache,import,spark,id 来源: https://blog.51cto.com/u_13985831/2836508
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。