Apache Spark 简介和rdds相关操作(1.6版本)
本文图较多。。主要懒得打字
先简单复习一下data science 的工作流程
Spark可以参与其中哪些环节呢?
举例:通过访问日志分析网站访问信息 假设我们要通过访问的日期来预测访问的形式,spark的工作流程:
似乎和hadoop没什么差别? 我们先回顾一下hadoop
- Hadoop
- Hadoop 使用单一的mapreduce计算模型
- 只处理在硬盘中的data,mapreduce也只在磁盘中进行,速度较慢
而在内存带宽疯狂增长的情况下, spark可以进行in-memory的计算
Spark
Spark结构
1.Spark Driver Process
负责跑主要程序,作用有三个
- 为spark应用保存信息
- responding to 用户的输入或者程序
- 分析、分布、规划工作任务给 执行的executors
2.Spark Executor Process
负责执行分配下来的任务,作用有:
- 执行分配下来的code
- 报告计算过程的状态,返回给driver process
3.Cluster Manager
监控物理机器,分配资源给spark applications
- Standalone
- YARN
- Mesos
Spark 1.6+ vs 2.0+
- 1.6 主要依赖于rdds(Resilient Distributed Datasets )
- 2.0则定义了更多以DataFrames形式的数据结构,类似于sql里的table和r里面的data.frames
(未来趋势)
经典的spark介绍,版本<2.0
The SparkContext
Spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。
Spark RDDs
弹性分布式数据集。是元素的集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统) 上的一个文件开始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。 RDDs线性图
多样化的创建方式
logsrdd = sc.textFile(“hdfs://user/mark/logdata”)
fatals = filter(lambda s: s.startswith(‘FATAL’), logsrdd)
fatals.cache()
fatals.count()
# Notes: logsrdd NOT loaded into RAM because of lazy evaluation
# fatals.cache() = rdd.persist(storageLevel.MEMORY) tries to persist fatals rdd in memory
spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")words.checkpoint()
常用transformations
e.g.
常用actions
e.g.
数wordcount的方法
constitution = sc.textFile("/user/root/constitution.txt")
wordCounts = constitution.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
swapped_ wordCounts = wordCounts.map(lambda record: (record[1], record[0])
#换顺序,把count作为key
sorted_counts = swapped_ wordCounts.sortByKey(ascending = False)
sorted_counts.take(20)
小结
spark面向内存, hadoop的MapReduce是面向磁盘的
同时spark非常有优势的一点是他的lazy evaluation.
也就是说,有任务先分配,不执行。 好处就是,能尽量长时间建立graph of operation, 可以优化计算的步骤, 扔掉重复的操作。相比于一个一个按次序执行,在大数据分析下,能较大地提升性能。
未完待续,下文写2.0dataframe的内容