1.目的
定义并使用sparkSQL UDF函数
2.元素
val product_order=Array( ("20170909,tom"), ("20170909,jack") , ("20170909,tom"), ("20170910,tony"))
3.脚本
/** * Created by puwenchao on 2017-09-26. */import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.{SparkConf, SparkContext}object udf{ def main(args: Array[String]) { //屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //生成sc\ssc val sparkConf=new SparkConf().setAppName("udf").setMaster("local") val sc=new SparkContext(sparkConf) val sqlContext=new SQLContext(sc) // 1.创建RDD并转化为Row RDD val product_order=Array( ("20170909,tom"), ("20170909,jack") , ("20170909,tom"), ("20170910,tony") ) val product_orderRDD=sc.parallelize(product_order,2) val product_orderRowRDD= product_orderRDD.map(_.split(",")).map(log=>Row(log(0),log(1))) // 2.构造元数据 val structType=StructType(Array( StructField("time",StringType,true), StructField("user",StringType,true) )) // 3.使用sqlContext创建dataframe val product_orderDF=sqlContext.createDataFrame(product_orderRowRDD,structType) // 4.注册成临时表 product_orderDF.registerTempTable("product_order") //定义UDF sqlContext.udf.register("strlen",(str:String)=>str.length) //运用UDF sqlContext.sql("select time,strlen(user) from product_order") .rdd //DF转为Row RDD .map(x=>(x(0),x(1))) //Row RDD转为普通RDD .foreach(println) }}
4.输出
(20170909,3)
(20170909,4) (20170909,3) (20170910,4)