博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark(Data)
阅读量:6496 次
发布时间:2019-06-24

本文共 4342 字,大约阅读时间需要 14 分钟。

hot3.png

共性  

1.分布式弹性数据集,处理超大型数据  2.三者都有惰性机制,在进行创建、转换,如map时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算  3.三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出  4.三者都有partition的概念  5.三者都可以对每一个分区进行操作(不使用partition,则map对外面的操作无效)  6.三者有许多共同的函数,如filter,sortBy  7.DataFrame和Dataset许多操作都需要spark.implicits._进行支持  8.DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

异性

  RDD一般和spark mlib同时使用  RDD不支持spark sql操作  DataFrame与RDD/Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值  DataFrame与Dataset一般与spark ml同时使用  DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作  DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv(可以带上表头)  Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同  (DataFrame也可以叫Dataset[Row],每一行的类型是Row)  (Dataset每一行的类型可能是各种case class)

转化

  val rdd1=df.rdd  val rdd2=ds.rdd  import spark.implicits._  val df = rdd.map(line => (line._1,line._2)).toDF("col1","col2")  import spark.implicits._  val df = rdd.toDF  import spark.implicits._  case class Coltest(col1:String,col2:Int) extends Serializable //定义字段名和类型  val ds = rdd.map(line => Coltest(line._1,line._2)).toDS  import spark.implicits._  case class Coltest(col1:String,col2:Int) extends Serializable //定义字段名和类型  val ds = df.as[Coltest]

 

List <=>RDD

val list = List(("a",22),("b",20),("c",23))val rdd = spark.SparkContext.parallelize(list)

File => RDD

val rdd = spark.sparkContext.textFile("/home/zhaomeng/source/data.txt")

RDD=>File

rdd.saveAsTextFile("/home/zhaomeng/result/")

JDBC=>RDD

val sc = spark.sparkContext  def createConnection() = {        Class.forName("com.mysql.jdbc.Driver").newInstance()        DriverManager.getConnection("jdbc:mysql://localhost:3306/zhaomeng", "zhaomeng", "zhaomeng")  }    def extractValues(r: ResultSet) = {        (r.getString(1), r.getString(2))  }    val data = new JdbcRDD(sc, createConnection, "SELECT aa,bb FROM cc_table where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)

RDD=>JDBC

mysql
mysql-connector-java
5.1.38
CREATE TABLE `cc_table` ( `name` varchar(255) NOT NULL, `count` int(10) unsigned DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf-8SELECT * FROM cc_table cc;def saveFun(iterator: Iterator[(String, Int)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into cc_table(name, count) values (?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/zhaomeng", "zhaomeng", "zhaomeng") iterator.foreach(data => { ps = conn.prepareStatement(sql) ps.setString(1, data._1) ps.setInt(2, data._2) ps.executeUpdate() } ) } catch { case e: Exception => println("MySQL Exception") } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } }}def rddToMySQL(args: Array[String]): Unit = { val sc = new SparkContext val data = sc.parallelize(List(("a", 10), ("b", 20), ("c", 30))) data.foreachPartition(myFun)}

JDBC=>DF

val jdbcDF = spark.read.format("jdbc").options(      Map(        "driver" -> "com.mysql.jdbc.Driver",        "url" -> "jdbc:mysql://localhost:3306",        "dbtable" -> "db.zhaomeng",        "user" -> "zhaomeng",        "password" -> "zhaomeng",        "fetchsize" -> "10")).load()

DF=>JDBC

def dfToMysQL(args: Array[String]): Unit = {    val url = "jdbc:mysql://localhost:3306/zhaomeng?user=zhaomeng&password=zhaomeng"     val sc = new SparkContext    val sqlContext = new org.apache.spark.sql.SQLContext(sc)    val schema = StructType(        StructField("name", StringType)         ::        StructField("count", IntegerType)        :: Nil)     val data = sc.parallelize(List(("aa", 30), ("bb", 29), ("cc", 40))                 .map(item => Row.apply(item._1, item._2))    import sqlContext.implicits._     val df = sqlContext.createDataFrame(data, schema)    df.createJDBCTable(url, "cc_table", false)  ##df.insertIntoJDBC(url, "cc_table", false)     sc.stop}or df.write.mode("overwrite").format("jdbc").options(  Map(    "driver" -> "com.mysql.jdbc.Driver",    "url" -> "jdbc:mysql://localhost:3306",    "dbtable" -> "db.zhaomeng",    "user" -> "zhaomeng",    "password" -> "zhaomeng",    "batchsize" -> "1000",    "truncate" -> "true")).save()

 

转载于:https://my.oschina.net/igooglezm/blog/1545252

你可能感兴趣的文章
Spring Boot 使用parent方式引用时 获取值属性方式默认@
查看>>
Elasticsearch之中文分词器插件es-ik(博主推荐)
查看>>
解决maven下载jar慢的问题(如何更换Maven下载源)
查看>>
linux安装gitLab
查看>>
concurrent包的实现示意图
查看>>
golang os.Args
查看>>
Linux常用命令
查看>>
【重磅】云栖社区2017年度内容特辑
查看>>
Java WEB开发时struts标签 显示set内容
查看>>
spring-data-elasticsearch 概述及入门(二)
查看>>
Solr启动和结束命令
查看>>
1.12 xshell密钥认证
查看>>
3.2 用户组管理
查看>>
awk
查看>>
AliOS Things SMP系统及其在esp32上实现示例
查看>>
VMware虚拟机出现“需要整合虚拟机磁盘”的解决方法
查看>>
ibatis 动态查询
查看>>
汇编语言之实验一
查看>>
09、Modules - Directory根据目录加载模块文件
查看>>
观影识人生
查看>>