共性
1.分布式弹性数据集,处理超大型数据 2.三者都有惰性机制,在进行创建、转换,如map时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算 3.三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出 4.三者都有partition的概念 5.三者都可以对每一个分区进行操作(不使用partition,则map对外面的操作无效) 6.三者有许多共同的函数,如filter,sortBy 7.DataFrame和Dataset许多操作都需要spark.implicits._进行支持 8.DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型
异性
RDD一般和spark mlib同时使用 RDD不支持spark sql操作 DataFrame与RDD/Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值 DataFrame与Dataset一般与spark ml同时使用 DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作 DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv(可以带上表头) Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同 (DataFrame也可以叫Dataset[Row],每一行的类型是Row) (Dataset每一行的类型可能是各种case class)
转化
val rdd1=df.rdd val rdd2=ds.rdd import spark.implicits._ val df = rdd.map(line => (line._1,line._2)).toDF("col1","col2") import spark.implicits._ val df = rdd.toDF import spark.implicits._ case class Coltest(col1:String,col2:Int) extends Serializable //定义字段名和类型 val ds = rdd.map(line => Coltest(line._1,line._2)).toDS import spark.implicits._ case class Coltest(col1:String,col2:Int) extends Serializable //定义字段名和类型 val ds = df.as[Coltest]
List <=>RDD
val list = List(("a",22),("b",20),("c",23))val rdd = spark.SparkContext.parallelize(list)
File => RDD
val rdd = spark.sparkContext.textFile("/home/zhaomeng/source/data.txt")
RDD=>File
rdd.saveAsTextFile("/home/zhaomeng/result/")
JDBC=>RDD
val sc = spark.sparkContext def createConnection() = { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost:3306/zhaomeng", "zhaomeng", "zhaomeng") } def extractValues(r: ResultSet) = { (r.getString(1), r.getString(2)) } val data = new JdbcRDD(sc, createConnection, "SELECT aa,bb FROM cc_table where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)
RDD=>JDBC
CREATE TABLE `cc_table` ( `name` varchar(255) NOT NULL, `count` int(10) unsigned DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf-8SELECT * FROM cc_table cc;def saveFun(iterator: Iterator[(String, Int)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into cc_table(name, count) values (?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/zhaomeng", "zhaomeng", "zhaomeng") iterator.foreach(data => { ps = conn.prepareStatement(sql) ps.setString(1, data._1) ps.setInt(2, data._2) ps.executeUpdate() } ) } catch { case e: Exception => println("MySQL Exception") } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } }}def rddToMySQL(args: Array[String]): Unit = { val sc = new SparkContext val data = sc.parallelize(List(("a", 10), ("b", 20), ("c", 30))) data.foreachPartition(myFun)} mysql mysql-connector-java 5.1.38
JDBC=>DF
val jdbcDF = spark.read.format("jdbc").options( Map( "driver" -> "com.mysql.jdbc.Driver", "url" -> "jdbc:mysql://localhost:3306", "dbtable" -> "db.zhaomeng", "user" -> "zhaomeng", "password" -> "zhaomeng", "fetchsize" -> "10")).load()
DF=>JDBC
def dfToMysQL(args: Array[String]): Unit = { val url = "jdbc:mysql://localhost:3306/zhaomeng?user=zhaomeng&password=zhaomeng" val sc = new SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) val schema = StructType( StructField("name", StringType) :: StructField("count", IntegerType) :: Nil) val data = sc.parallelize(List(("aa", 30), ("bb", 29), ("cc", 40)) .map(item => Row.apply(item._1, item._2)) import sqlContext.implicits._ val df = sqlContext.createDataFrame(data, schema) df.createJDBCTable(url, "cc_table", false) ##df.insertIntoJDBC(url, "cc_table", false) sc.stop}or df.write.mode("overwrite").format("jdbc").options( Map( "driver" -> "com.mysql.jdbc.Driver", "url" -> "jdbc:mysql://localhost:3306", "dbtable" -> "db.zhaomeng", "user" -> "zhaomeng", "password" -> "zhaomeng", "batchsize" -> "1000", "truncate" -> "true")).save()