Spark 应用程序使用 spark-submit 命令,用于在集群上部署 Spark 应用程序。它通过统一的接口使用所有相应的集群管理器。因此,您无需为每个集群管理器配置应用程序。
例子
让我们使用之前使用过的 Shell 命令来举例,进行字数统计。这里,我们将其视为一个 Spark 应用程序。
示例输入
以下文本是输入数据,文件名为 in.txt
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
看看下面的程序
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存到名为 SparkWordCount.scala 的文件中,并将其放在名为 spark-application 的用户定义目录中
注意 - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 将行(来自文本文件)标记为单词,使用 map() 方法计算单词频率,使用 reduceByKey() 方法计算每个单词的重复次数。
使用以下步骤提交此应用程序。通过终端执行 spark-application 目录中的所有步骤。
步骤 1:下载 Spark Ja
编译需要 Spark core jar,因此,从以下链接 Spark core jar 下载 spark-core_2.10-1.3.0.jar,并将 jar 文件从下载目录移动到 spark-application 目录。
步骤 2:编译程序
使用以下命令编译上述程序。此命令应从 spark-application 目录执行。其中, /usr/ local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库中获取的 Hadoop 支持 jar 文件。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
步骤 3:创建 JAR
使用以下命令创建 spark 应用程序的 jar 文件。这里, wordcount 是 jar 文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
步骤 4:提交 Spark 应用程序
使用以下命令提交 spark 应用程序
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,您将看到下面的输出。以下输出中的 OK 用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,您会发现一些不同的东西,例如:
* 已成功在端口 42954 上启动服务“sparkDriver”
* MemoryStore 初始容量为 267.3 MB
* 在 http://192.168.1.217:4040 启动 SparkUI
* 添加的 JAR 文件:/home/hadoop/piapplication/count.jar
* ResultStage 1(saveAsTextFile at SparkPi.scala:11)耗时 0.566 秒
* 在 http://192.168.1.217:4040 停止 Spark Web UI
* MemoryStore 已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
步骤 5:检查输出
程序执行成功后,会在 spark-application 目录下发现名为 outfile 的目录。
以下命令用于打开并检查 outfile 目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
检查 part-00000 文件中输出的命令是 -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
检查 part-00001 文件中输出的命令是 -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
阅读以下部分以了解有关 spark-submit 命令的更多信息。
Spark-submit Syntax Spark-submit 语法
spark-submit [options] <app jar | python file> [app arguments]
Options 选项
S.No 序号 | Option 选项 | Description 描述 |
---|---|---|
1 | --master - 掌握 | spark://host:port, mesos://host:port, yarn, or local. spark://host:port、mesos://host:port、yarn 或 local。 |
2 | --deploy-mode --部署模式 | Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). 是否在本地(“客户端”)或集群内的其中一台工作机器(“集群”)上启动驱动程序(默认值:客户端)。 |
3 | --class - 班级 | Your application's main class (for Java / Scala apps). 您的应用程序的主类(适用于 Java / Scala 应用程序)。 |
4 | --name - 姓名 | A name of your application. 您的应用程序的名称。 |
5 | --jars | Comma-separated list of local jars to include on the driver and executor classpaths. 要包含在驱动程序和执行器类路径中的本地 jar 的逗号分隔列表。 |
6 | --packages --软件包 | Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. 要包含在驱动程序和执行器类路径中的 jar 的 maven 坐标的逗号分隔列表。 |
7 | --repositories --存储库 | Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. 以逗号分隔的附加远程存储库列表,用于搜索使用 --packages 给出的 maven 坐标。 |
8 | --py-files --py 文件 | Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps. 以逗号分隔的 .zip、.egg 或 .py 文件列表,放置在 Python 应用程序的 PYTHON PATH 上。 |
9 | --files --文件 | Comma-separated list of files to be placed in the working directory of each executor. 要放置在每个执行器的工作目录中的文件的逗号分隔列表。 |
10 | --conf (prop=val) --conf(prop=val) | Arbitrary Spark configuration property. 任意 Spark 配置属性。 |
11 | --properties-file --properties 文件 | Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults. 加载额外属性的文件路径。如果未指定,则查找 conf/spark-defaults。 |
12 | --driver-memory --驱动程序内存 | Memory for driver (e.g. 1000M, 2G) (Default: 512M). 驱动程序内存(例如 1000M、2G)(默认值:512M)。 |
13 | --driver-java-options | Extra Java options to pass to the driver. 传递给驱动程序的额外 Java 选项。 |
14 | --driver-library-path --驱动程序库路径 | Extra library path entries to pass to the driver. 传递给驱动程序的额外库路径条目。 |
15 | --driver-class-path --驱动程序类路径 | Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. |
16 | --executor-memory --执行器内存 | Memory per executor (e.g. 1000M, 2G) (Default: 1G). 每个执行器的内存(例如 1000M、2G)(默认值:1G)。 |
17 | --proxy-user --代理用户 | User to impersonate when submitting the application. 提交申请时模拟的用户。 |
18 | --help, -h --帮助,-h | Show this help message and exit. 显示此帮助消息并退出。 |
19 | --verbose, -v --详细,-v | Print additional debug output. 打印额外的调试输出。 |
20 | --version - 版本 | Print the version of current Spark. 打印当前 Spark 的版本。 |
21 | --driver-cores NUM --driver-cores 数量 | Cores for driver (Default: 1). 驱动程序的核心(默认值:1)。 |
22 | --supervise - 监督 | If given, restarts the driver on failure. 如果给定,则在失败时重新启动驱动程序。 |
23 | --kill - 杀 | If given, kills the driver specified. 如果给定,则终止指定的驱动程序。 |
24 | --status - 地位 | If given, requests the status of the driver specified. 如果给定,则请求指定驱动程序的状态。 |
25 | --total-executor-cores | Total cores for all executors. 所有执行器的核心总数。 |
26 | --executor-cores --executor 核心 | Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode). 每个执行器的核心数。(默认值:YARN 模式下为 1,独立模式下为工作器上所有可用的核心)。 |
Comments