博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[大数据之Spark]——快速入门
阅读量:6924 次
发布时间:2019-06-27

本文共 4723 字,大约阅读时间需要 15 分钟。

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看了解更多的内容。

为了良好的阅读下面的文档,最好是结合实际的练习。首先需要,然后安装hdfs,可以下载任意版本的hdfs。

Spark Shell 交互

基本操作

Spark Shell提供给用户一个简单的学习API的方式 以及 快速分析数据的工具。在shell中,既可以使用scala(运行在java虚拟机,因此可以使用java库)也可以使用python。可以在spark的bin目录下启动spark shell:

./bin/spark-shell.sh

spark操作对象是一种分布式的数据集合,叫做Resilient Distributed Dataset(RDD)。RDD可以通过hdfs文件创建,也可以通过RDD转换得来。

下面就实际操作下,看看效果。我的本地有个文件——test.txt,内容为:

hello worldhaha nihao

可以通过这个文件创建一个新的RDD

val textFile = sc.textFile("test.txt")textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at 
:21

在Spark中,基于RDD可以作两种操作——Actions算子操作以及Transformations转换操作。

我们可以使用一些算子操作体验下:

scala> textFile.count() //RDD有用的数量res1: Long = 2scala> textFile.first() //RDD第一行res3: String = hello world

再执行一些转换操作,比如使用filter转换,返回一个新的RDD集合:

scala> val lines = textFile.filter(line=>line.contains("hello"))lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at 
:23scala> lines.count()res4: Long = 1scala> val lines = textFile.filter(line=>line.contains("haha"))lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at
:23scala> lines.count()res5: Long = 1scala> lines.first()res6: String = haha nihao

更多RDD操作

RDD算子和转换可以组成很多复杂的计算,比如我们想找出最多一行中单词最多的单词数量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)res4: Long = 15

这个操作会把一行通过split切分计数,转变为一个整型的值,然后创建成新的RDD。reduce操作用来寻找单词最多的那一行。

用户可以在任何时候调用方法和库,可以使用Math.max()函数:

scala> import java.lang.Mathimport java.lang.Mathscala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))res5: Int = 15

一个很常见的数据操作就是map reduce,这个操作在hadoop中很常见。Spark可以轻松的实现Mapreduce任务:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at 
:28

这里使用了flatMap,map以及reduceByKey等转换操作来计算每个单词在文件中的数量。为了在shell中显示,可以使用collect()触发计算:

scala> wordCounts.collect()res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的hot数据集,或者运行一个类似PageRank的算法。

举个简单的例子,对linesWithSpark RDD数据集进行缓存,然后再调用count()会触发算子操作进行真正的计算,之后再次调用count()就不会再重复的计算,直接使用上一次计算的结果的RDD了:

scala> linesWithSpark.cache()res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at 
:27scala> linesWithSpark.count()res8: Long = 19scala> linesWithSpark.count()res9: Long = 19

看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输RDD计算的结果。你也可以通过bin/spark-shell向集群提交任务,可以参考

独立应用

要使用spark api写一个自己的应用也很简单,可以基于scala、java、python去写一些简单的应用。

/* SimpleApp.scala */import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConfobject SimpleApp {  def main(args: Array[String]) {    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system    val conf = new SparkConf().setAppName("Simple Application")    val sc = new SparkContext(conf)    val logData = sc.textFile(logFile, 2).cache()    val numAs = logData.filter(line => line.contains("a")).count()    val numBs = logData.filter(line => line.contains("b")).count()    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))  }}

注意应用需要定义main()方法。这个程序仅仅是统计文件中包含字符ab的分别都有多少行。你可以设置YOUR_SPARK_HOME替换自己的文件目录。不像之前在shell中的例子那样,我们需要自己初始化sparkContext。

通过SparkConf构造方法创建SparkContext。

应用依赖于spark api,因此需要在程序中配置sbt的配置文件——simple.sbt,它声明了spark的依赖关系。

name := "Simple Project"version := "1.0"scalaVersion := "2.11.7"libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"

为了让sbt正确的工作,还需要创建SimpleApp.scala以及simple.sbt。然后就可以执行打包命令,通过spark-submit运行了:

# Your directory layout should look like this 你的工程目录应该向下面这样$ find .../simple.sbt./src./src/main./src/main/scala./src/main/scala/SimpleApp.scala# Package a jar containing your application 运行sbt命令进行打包$ sbt package...[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar# Use spark-submit to run your application 通过spark-submit提交任务jar包$ YOUR_SPARK_HOME/bin/spark-submit \  --class "SimpleApp" \  --master local[4] \  target/scala-2.11/simple-project_2.11-1.0.jar...Lines with a: 46, Lines with b: 23

其他地址

通过上面的例子,就可以运行起来自己的Spark应用了。

那么可以参考下面的链接获得更多的内容:

  • 为了更深入的学习,可以阅读
  • 如果想要运行Spark集群,可以参考
  • 最后,Spark在examples目录中内置了多种语言版本的例子,如scala,java,python,r等等。你可以通过下面的命令运行:
# For Scala and Java, use run-example:./bin/run-example SparkPi# For Python examples, use spark-submit directly:./bin/spark-submit examples/src/main/python/pi.py# For R examples, use spark-submit directly:./bin/spark-submit examples/src/main/r/dataframe.R
本文转自博客园xingoo的博客,原文链接:,如需转载请自行联系原博主。
你可能感兴趣的文章
树形结构在开发中的应用
查看>>
RedhatKVM 与VMware性能比较,哪个更能胜出!
查看>>
数据库在网站中的作用
查看>>
MySQL sql-mode
查看>>
图形组态工具预览
查看>>
Windows Server 笔记(二):Windows Server 2008配置(1)
查看>>
判断一个数是不是素数
查看>>
西安协同工作流如何接入其他系统的组织机构
查看>>
KVO
查看>>
LAMP原理架构解析(二):Php操作Mysql数据库
查看>>
二十年后的回眸(8)——晋级的炒更之旅
查看>>
京东商城IPO 大平台局面形成的几个因素
查看>>
九、Linux系统安装和常见故障排除
查看>>
linux+nginx+mysql+php高性能服务器搭建
查看>>
awk内置变量2-2
查看>>
我的友情链接
查看>>
MFC中的DC、CDC、HDC、句柄、设备上下文的不同意思,适合初学者参考
查看>>
mysql复制
查看>>
Spring中配置事务的几种方式
查看>>
php 新漏洞
查看>>