Spark1.6.1和2.4读取csv文件,转为为dataframe和使用sql

一、spark1.6读取csv

spark2.0才开始源码支持CSV,所以1.6版本需要借助第三方包来实现读取CSV文件,有好几种方法,

  • 1.如果有maven的,到https://spark-packages.org/package/databricks/spark-csv下载对应scala版本的第三方jar包然后再maven的pom里面添加denpency,然后根据官网的用法用–packages传入。这样它就会自动去maven里面寻找了。

  • 2.如果是Python开发的,用Python自带的库,比如pandas、csv等,可以参考这个博客。

  • 3.如果没有maven可以通过textfile读入,然后通过opencsv来转化。到这里下载https://sourceforge.net/projects/opencsv/files/latest/download第三方jar包,放到spark安装目录的lib目录和Hadoop的share目录里面/usr/local/src/hadoop-2.6.1/share/hadoop/common/lib


import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
 val pmix=sc.textFile("file:///mnt/hgfs/vm/20170101.csv")
val pmixrdd=pmix.map{line =>val reader=new CSVReader(new StringReader(line)); reader.readNext()}
pmixrdd.count  

我放进去一个一千万行的数据,用了大概两分钟

二、转化为DataFrame

RDD有两种方式转化df:根据反射推断方式和编程方式。

根据反射推断的方式,scala2.10只能支持22列,但是我的表有30多列,所以我选择编程的方式,虽然编程的方式比较麻烦点。


import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;
val schemaString = "region,store_type,sitename,storeid,check_no,employee,dob,dob_full,daypart,hour,minute,qcid,qc_name,qc,category,category_name,item,bohname,longname,tender_name,check_type,tot_amt,tot_amt_ala,price_tot,tot_amt_disc,disc_name,quantity,food_cost,paper_cost,burger_count,dimension_product_mix,dimension_channel_mix"
val schema =
  StructType(
    schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD= pmixrdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31)))
val pmixdf= sqlContext.createDataFrame(rowRDD, schema)  

三、使用SQL

接下来注册一个临时表


pmixdf.registerTempTable("pmix")
val results = sqlContext.sql("SELECT * FROM pmix limit 10").show()
pmixdf.write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/pmix.parquet")  

四、spark2.4读取csv

```
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName(“pmix”).setMaster(“local[2]”) //val sc = new SparkContext(conf) //如果是集群启动,就不用这句了,自动加载sc sc.setLogLevel(“WARN”) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val spark = new SQLContext(sc)
import spark.implicits._
val df = spark.read.format(“com.databricks.spark.csv”)
.option(“header”, “false”)
.option(“inferSchema”, “false”) //也可以.option(“inferSchema”, true.toString) //自动推断属性列的数据类型 .option(“delimiter”,”,”) //分隔符,默认为 ,
.load(“file:///mnt/hgfs/vm/20170101.csv”)
//.save(outpath) df.show()

 ```