标签:deequ amazon result import org spark
目前,公司里数据质量检测是通过配置规则报警来实现的,对于有些表需要用shell脚本来封装hivesql来进行检测,在时效性和准确上不能很好的满足,故尝试使用Deequ来做质量检测工具。
一、官网示例
package org.shydow.deequ import com.amazon.deequ.checks.CheckStatus import com.amazon.deequ.constraints.ConstraintStatus import com.amazon.deequ.{VerificationResult, VerificationSuite} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author shydow * @date 2022-03-25 */ object DQService { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("DQC") .master("local[*]") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ val source: RDD[Item] = sc.parallelize(Seq( Item(1, "Thingy A", "awesome thing.", "high", 0), Item(2, "Thingy B", "available at http://thingb.com", null, 0), Item(3, null, null, "low", 5), Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10), Item(5, "Thingy E", null, "high", 12))) val sourceDF: DataFrame = spark.createDataFrame(source) sourceDF.printSchema() // 质量检测 val result: VerificationResult = DeequCheckRules.createRule(sourceDF) if (result.status == CheckStatus.Success) { println("The data passed the test, everything is fine!") } else { println("We found errors in the data:\n") val resultsForAllConstraints = result.checkResults .flatMap { case (_, checkResult) => checkResult.constraintResults } resultsForAllConstraints .filter { _.status != ConstraintStatus.Success } .foreach { result => println(s"${result.constraint}: ${result.message.get}") } } spark.close() } }
package org.shydow.deequ import com.amazon.deequ.{VerificationResult, VerificationSuite} import com.amazon.deequ.checks.{Check, CheckLevel} import org.apache.spark.sql.DataFrame /** * @author shydow * @date 2022-03-25 */ object DeequCheckRules { // 自定义规则1 def createRule(df: DataFrame): VerificationResult = { VerificationSuite().onData(df) .addCheck(Check(CheckLevel.Error, "this a unit test") .hasSize(_ == 5) // 判断数据量是否是5条 .isComplete("id") // 判断该列是否全部不为空 .isUnique("id") // 判断该字段是否是唯一 .isComplete("productName") // 判断该字段全部不为空 .isContainedIn("priority", Array("high", "low")) // 该字段仅仅包含这两个字段 .isNonNegative("numViews") //该字段不包含负数 .containsURL("description", _ >= 0.5) // 包含url的记录是否超过0.5 .hasApproxQuantile("numViews", 0.5, _ <= 10) ) .run() } }
二、生产中配置的一些规则
def odsTableRule(df: DataFrame) = { VerificationSuite() .onData(df) .addCheck( Check(CheckLevel.Error, "base checks") .isComplete("primaryKey") // primaryKey即主要字段不能为空 .isUnique("uniqueKey") // unique即唯一主键 .isContainedIn("priority", Array("high", "low")) // 判断该字段是否只存在枚举类型 .isNonNegative("numViews") // 断言该字段非负数 .satisfies( "abs(column1 - column2) <= 0.20 * column2", "value(column1) lies between value(column2)-20% and value(column2)+20%" ) // 自定义条件,判断col1-col2绝对值在0.2 * col2间 ) .addCheck( Check(CheckLevel.Warning, "distribution checks") .containsURL("description", _ >= 0.5) // 断言有一半的值包含url .hasApproxQuantile("numViews", 0.5, _ <= 10)) // 断言有一半的值不超过10 .run() }
标签:deequ,amazon,result,import,org,spark 来源: https://www.cnblogs.com/baran/p/16055785.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。