How to Deploy Spark Application on Yarn and Integrate with Hive

December 1, 2016 | By
| Reply More

In this article, I will tell you working of Spark with  YARN and  Hive. Before I begin, let me briefly tell you what Apache Spark, YARN and Apache Hive are.

Apache Spark is an in-memory distributed processing framework. Spark is used for real-time processing. Apache Spark can run programs up to 100x faster than Hadoop MapReduce in memory. Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3. With Apache Spark you can do stream processing, Machine learning, Graph processing as well. It integrates with Hadoop seamlessly and leverages HDFS (Hadoop Distributed File System) for storage.

Apache Hadoop YARN (Yet Another Resource Negotiator) is a cluster management technology. YARN is one of the key features in the second-generation Hadoop 2 version of the Apache Software Foundation's open source distributed processing framework. YARN is a software rewrite that decouples MapReduce's resource management and scheduling capabilities from the data processing component, enabling Hadoop to support more varied processing approaches and a broader array of applications. For example, Hadoop clusters can now run interactive querying and streaming data applications simultaneously with MapReduce batch jobs.

Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis. Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop. It provides an SQL-like language called HiveQL with schema on read and transparently converts queries to MapReduce. Since most of the data warehousing application work with SQL based querying language, Hive supports easy portability of SQL-based application to Hadoop. While initially developed by Facebook, Apache Hive is now used by all the leading organizations.

Now that you have got a brief idea about Apache Spark, YARN and Apache Hive, we are good to go and run Spark examples with YARN and Apache Hive.

To run the below practical, you need to have Hadoop 2.x, Spark 2.x, Hive 2 and Java 8 installed on a Ubuntu machine. I have everything installed on a Ubuntu 16.04 system.

How to deploy spark application on yarn

We can deploy Spark application on YARN cluster in two modes - Client mode and Cluster mode.

Let us start by deploying a spark application on YARN in cluster mode. We will deploy SparkPi application which outputs the value of Pi.

Now go to Spark directory and run below spark-submit command.

Command : ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 512m --executor-cores 1 --queue thequeue lib/spark-examples*.jar

hadoop@hadoop-VirtualBox:~$ cd spark-2.0.2-bin-hadoop2.7/

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$ ./bin/spark-submit --class
org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memo
ry 1g --executor-memory 512m --executor-cores 1 examples/jars/spark-examples_2.11-2
.0.2.jar

16/11/27 02:23:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable

16/11/27 02:23:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:
8032

16/11/27 02:23:21 INFO yarn.Client: Requesting a new application from cluster with 
1 NodeManagers

16/11/27 02:23:21 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (8192 MB per container)

16/11/27 02:23:21 INFO yarn.Client: Will allocate AM container, with 1408 MB memory 
including 384 MB overhead

16/11/27 02:23:21 INFO yarn.Client: Setting up container launch context for our AM

16/11/27 02:23:21 INFO yarn.Client: Setting up the launch environment for our AM 
container

16/11/27 02:23:21 INFO yarn.Client: Preparing resources for our AM container

16/11/27 02:23:22 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive 
is set, falling back to uploading libraries under SPARK_HOME.

16/11/27 02:23:39 INFO yarn.Client: Uploading resource file:/tmp/spark-42ba1bd7-
a336-4690-9bce-e77a73510efb/__spark_libs__4167951666400537062.zip -> 
hdfs://localhost:9000/user/hadoop/.sparkStaging/application_1480194774626_0001/
__spark_libs__4167951666400537062.zip

16/11/27 02:23:56 INFO yarn.Client: Uploading resource file:/home/hadoop/spark-
2.0.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.0.2.jar -> hdfs://localhost
:9000/user/hadoop/.sparkStaging/application_1480194774626_0001/spark-examples_2.11-
2.0.2.jar

16/11/27 02:23:57 INFO yarn.Client: Uploading resource file:/tmp/spark-42ba1bd7-
a336-4690-9bce-e77a73510efb/__spark_conf__3714837574406596867.zip -> 
hdfs://localhost:9000/user/hadoop/.sparkStaging/application_1480194774626_0001/
__spark_conf__.zip

16/11/27 02:23:58 INFO spark.SecurityManager: Changing view acls to: hadoop

16/11/27 02:23:58 INFO spark.SecurityManager: Changing modify acls to: hadoop

16/11/27 02:23:58 INFO spark.SecurityManager: Changing view acls groups to:

16/11/27 02:23:58 INFO spark.SecurityManager: Changing modify acls groups to:

16/11/27 02:23:58 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups 
with view permissions: Set(); users  with modify permissions: Set(hadoop); groups
with modify permissions: Set()

16/11/27 02:23:58 INFO yarn.Client: Submitting application application_1480194
774626_0001 to ResourceManager

16/11/27 02:23:59 INFO impl.YarnClientImpl: Submitted application application_1
480194774626_0001

16/11/27 02:24:01 INFO yarn.Client: Application report for application_148019477
4626_0001 (state: ACCEPTED)

16/11/27 02:24:01 INFO yarn.Client:

                client token: N/A

                diagnostics: N/A

                ApplicationMaster host: N/A

                ApplicationMaster RPC port: -1

                queue: default

                start time: 1480193639130

                final status: UNDEFINED

                tracking URL: http://hadoop-VirtualBox:8088/proxy/application_14
80194774626_0001/

                user: hadoop

16/11/27 02:24:02 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: ACCEPTED)

16/11/27 02:24:03 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: ACCEPTED)

16/11/27 02:26:05 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: RUNNING)

16/11/27 02:26:05 INFO yarn.Client:

                client token: N/A

                diagnostics: N/A

                ApplicationMaster host: 10.0.2.15

                ApplicationMaster RPC port: 0

                queue: default

                start time: 1480193639130

                final status: UNDEFINED

                tracking URL: http://hadoop-VirtualBox:8088/proxy/application_1480
194774626_0001/

                user: hadoop

16/11/27 02:26:06 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: RUNNING)

16/11/27 02:26:07 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: RUNNING)

16/11/27 02:27:43 INFO yarn.Client: Application report for application_14801947746
26_0001 (state: FINISHED)

16/11/27 02:27:43 INFO yarn.Client:

                client token: N/A

                diagnostics: N/A

                ApplicationMaster host: 10.0.2.15

                ApplicationMaster RPC port: 0

                queue: default

                start time: 1480193639130

                final status: SUCCEEDED

                tracking URL: http://hadoop-VirtualBox:8088/proxy/application_14
80194774626_0001/

                user: hadoop

16/11/27 02:27:44 INFO yarn.Client: Deleting staging directory hdfs://localhost:
9000/user/hadoop/.sparkStaging/application_1480194774626_0001

16/11/27 02:27:44 INFO util.ShutdownHookManager: Shutdown hook called

16/11/27 02:27:45 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-42ba
1bd7-a336-4690-9bce-e77a73510efb

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$

The application went through Accepted -> Running -> Finished stages. Now you need to go to tracking url mentioned above when application ended ->

http://hadoop-VirtualBox:8088/proxy/application_1480194774626_0001/

yarn-application - Spark With Yarn and Hive

Click on the Logs.

yarn-application-stdout - Spark With Yarn and Hive

Click on stdout to get the output value of Pi.

yarn-application-output

Now let us deploy the SparkPi application on YARN in client mode. Run the below command.

Command :  ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --driver-memory 1g --executor-memory 512m --executor-cores 1 examples/jars/spark-examples_2.11-2.0.2.jar

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$ ./bin/spark-submit --class 
org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --driver-
memory 1g --executor-memory 512m --executor-cores 1 examples/jars/spark-exampl
es_2.11-2.0.2.jar


16/11/27 03:02:27 INFO spark.SparkContext: Running Spark version 2.0.2

16/11/27 03:02:34 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

16/11/27 03:02:37 WARN util.Utils: Your hostname, hadoop-VirtualBox resolves 
to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)

16/11/27 03:02:37 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address

16/11/27 03:02:38 INFO spark.SecurityManager: Changing view acls to: hadoop

16/11/27 03:02:38 INFO spark.SecurityManager: Changing modify acls to: hadoop

16/11/27 03:02:39 INFO spark.SecurityManager: Changing view acls groups to:

16/11/27 03:02:39 INFO spark.SecurityManager: Changing modify acls groups to:

16/11/27 03:02:39 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups 
with view permissions: Set(); users  with modify permissions: Set(hadoop); 
groups with modify permissions: Set()

16/11/27 03:02:43 INFO util.Utils: Successfully started service 'sparkDriver' 
on port 45281.

16/11/27 03:02:44 INFO spark.SparkEnv: Registering MapOutputTracker

16/11/27 03:02:44 INFO spark.SparkEnv: Registering BlockManagerMaster

16/11/27 03:02:44 INFO storage.DiskBlockManager: Created local directory at 
/tmp/blockmgr-e056a63a-93d4-4ead-b93f-3c7ab3f0e9cc

16/11/27 03:02:45 INFO memory.MemoryStore: MemoryStore started with capacity 
413.9 MB

16/11/27 03:02:48 INFO spark.SparkEnv: Registering OutputCommitCoordinator

16/11/27 03:02:49 INFO util.log: Logging initialized @29411ms

16/11/27 03:02:51 INFO server.Server: jetty-9.2.z-SNAPSHOT

16/11/27 03:02:51 INFO handler.ContextHandler: Started o.s.j.s.Servlet
ContextHandler@303e3593{/jobs,null,AVAILABLE}

16/11/27 03:02:51 INFO handler.ContextHandler: Started o.s.j.s.Servlet
ContextHandler@4ef27d66{/jobs/json,null,AVAILABLE}

16/11/27 03:02:52 INFO server.ServerConnector: Started ServerConnector
@31a8e0fa{HTTP/1.1}{0.0.0.0:4040}

16/11/27 03:02:52 INFO server.Server: Started @32174ms

16/11/27 03:02:52 INFO util.Utils: Successfully started service 'SparkUI'
 on port 4040.

16/11/27 03:02:52 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at
 http://10.0.2.15:4040

16/11/27 03:02:53 INFO spark.SparkContext: Added JAR file:/home/hadoop/spark-
2.0.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.0.2.jar at spark://10.0
.2.15:45281/jars/spark-examples_2.11-2.0.2.jar with timestamp 1480195973272

16/11/27 03:03:04 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0
:8032

16/11/27 03:03:06 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers

16/11/27 03:03:06 INFO yarn.Client: Verifying our application has not requested
 more than the maximum memory capability of the cluster (8192 MB per container)

16/11/27 03:03:06 INFO yarn.Client: Will allocate AM container, with 896 MB memory 
including 384 MB overhead

16/11/27 03:03:06 INFO yarn.Client: Setting up container launch context for our AM

16/11/27 03:03:06 INFO yarn.Client: Setting up the launch environment for our AM 
container

16/11/27 03:03:07 INFO yarn.Client: Preparing resources for our AM container

16/11/27 03:03:07 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive 
is set, falling back to uploading libraries under SPARK_HOME.

16/11/27 03:03:28 INFO yarn.Client: Uploading resource file:/tmp/spark-13698b28-
7a4f-46b9-b322-12e29f4cd6b4/__spark_libs__5770156213301628377.zip -> hdfs://
localhost:9000/user/hadoop/.sparkStaging/application_1480194774626_0002/__
spark_libs__5770156213301628377.zip

16/11/27 03:03:44 INFO yarn.Client: Uploading resource file:/tmp/spark-13698b28
-7a4f-46b9-b322-12e29f4cd6b4/__spark_conf__1950216342757027773.zip -> hdfs://
localhost:9000/user/hadoop/.sparkStaging/application_1480194774626_0002/__
spark_conf__.zip

16/11/27 03:03:44 INFO spark.SecurityManager: Changing view acls to: hadoop

16/11/27 03:03:44 INFO spark.SecurityManager: Changing modify acls to: hadoop

16/11/27 03:03:44 INFO spark.SecurityManager: Changing view acls groups to:

16/11/27 03:03:44 INFO spark.SecurityManager: Changing modify acls groups to:

16/11/27 03:03:44 INFO spark.SecurityManager: SecurityManager: authentication 
disabled;ui acls disabled; users  with view permissions: Set(hadoop); groups 
with view permissions: Set(); users  with modify permissions: Set(hadoop); 
groups with modify permissions: Set()

16/11/27 03:03:45 INFO yarn.Client: Submitting application application_14801947
74626_0002 to ResourceManager

16/11/27 03:03:45 INFO impl.YarnClientImpl: Submitted application application_
1480194774626_0002

16/11/27 03:03:45 INFO cluster.SchedulerExtensionServices: Starting Yarn extension
 services with app application_1480194774626_0002 and attemptId None

16/11/27 03:03:47 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:03:47 INFO yarn.Client:

                client token: N/A

                diagnostics: N/A

                ApplicationMaster host: N/A

                ApplicationMaster RPC port: -1

                queue: default

                start time: 1480196025313

                final status: UNDEFINED

                tracking URL: http://hadoop-VirtualBox:8088/proxy/application_
1480194774626_0002/

                user: hadoop

16/11/27 03:03:48 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:03:49 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:05:05 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster registered as NettyRpcEndpointRef(null)

16/11/27 03:05:05 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:05:05 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.
apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> 
hadoop-VirtualBox, PROXY_URI_BASES -> http://hadoop-VirtualBox:8088/proxy/
application_1480194774626_0002), /proxy/application_1480194774626_0002

16/11/27 03:05:05 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.
webproxy.amfilter.AmIpFilter

16/11/27 03:05:06 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:05:07 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:05:08 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: ACCEPTED)

16/11/27 03:05:09 INFO yarn.Client: Application report for application_14801947746
26_0002 (state: RUNNING)

16/11/27 03:05:09 INFO yarn.Client:

                client token: N/A

                diagnostics: N/A

                ApplicationMaster host: 10.0.2.15

                ApplicationMaster RPC port: 0

                queue: default

                start time: 1480196025313

                final status: UNDEFINED

                tracking URL: http://hadoop-VirtualBox:8088/proxy/application_
1480194774626_0002/

                user: hadoop

16/11/27 03:05:09 INFO cluster.YarnClientSchedulerBackend: Application application_
1480194774626_0002 has started running.

16/11/27 03:05:09 INFO util.Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 38735.

16/11/27 03:05:09 INFO netty.NettyBlockTransferService: Server created on 
10.0.2.15:38735

16/11/27 03:05:09 INFO storage.BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, 10.0.2.15, 38735)

16/11/27 03:05:09 INFO storage.BlockManagerMasterEndpoint: Registering block 
manager 
10.0.2.15:38735 with 413.9 MB RAM, BlockManagerId(driver, 10.0.2.15, 38735)

16/11/27 03:05:10 INFO storage.BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, 10.0.2.15, 38735)

16/11/27 03:05:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContext
Handler@75de6341{/metrics/json,null,AVAILABLE}

16/11/27 03:05:15 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is 
ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime:
 30000(ms)

16/11/27 03:05:15 WARN spark.SparkContext: Use an existing SparkContext, some 
configuration may not take effect.

16/11/27 03:05:16 INFO handler.ContextHandler: Started o.s.j.s.ServletContext
Handler@298f0a0b{/SQL,null,AVAILABLE}

16/11/27 03:05:16 INFO handler.ContextHandler: Started o.s.j.s.ServletContex
tHandler@31dfc6f5{/SQL/json,null,AVAILABLE}


16/11/27 03:05:43 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala
:38

16/11/27 03:05:45 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala
:38) with 2 output partitions

16/11/27 03:05:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce
 at SparkPi.scala:38)

16/11/27 03:05:45 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/11/27 03:05:45 INFO scheduler.DAGScheduler: Missing parents: List()

16/11/27 03:05:47 INFO scheduler.DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents

16/11/27 03:05:58 INFO memory.MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 1832.0 B, free 413.9 MB)

16/11/27 03:06:00 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as 
bytes in memory (estimated size 1169.0 B, free 413.9 MB)

16/11/27 03:06:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on 10.0.2.15:38735 (size: 1169.0 B, free: 413.9 MB)

16/11/27 03:06:00 INFO spark.SparkContext: Created broadcast 0 from broadcast 
at DAGScheduler.scala:1012

16/11/27 03:06:01 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from
 ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34)

16/11/27 03:06:01 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks

16/11/27 03:06:16 WARN cluster.YarnScheduler: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have
 sufficient resources

16/11/27 03:06:31 WARN cluster.YarnScheduler: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have
 sufficient resources

16/11/27 03:07:21 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered
 executor NettyRpcEndpointRef(null) (10.0.2.15:37586) with ID 4

16/11/27 03:07:23 INFO storage.BlockManagerMasterEndpoint: Registering block 
manager hadoop-VirtualBox:39299 with 117.0 MB RAM, BlockManagerId
(4, hadoop-VirtualBox, 39299)

16/11/27 03:07:23 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
 (TID 0, hadoop-VirtualBox, partition 0, PROCESS_LOCAL, 5440 bytes)

16/11/27 03:07:24 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: 
Launching task 0 on executor id: 4 hostname: hadoop-VirtualBox.

16/11/27 03:07:24 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: 
Registered executor NettyRpcEndpointRef(null) (10.0.2.15:37584) with ID 1

16/11/27 03:07:24 INFO scheduler.TaskSetManager: Starting task 1.0 in 
stage 0.0 (TID 1, hadoop-VirtualBox, partition 1, PROCESS_LOCAL, 5440 bytes)

16/11/27 03:07:24 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: 
Launching task 1 on executor id: 1 hostname: hadoop-VirtualBox.

16/11/27 03:07:24 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: R
egistered executor NettyRpcEndpointRef(null) (10.0.2.15:37582) with ID 3

16/11/27 03:07:24 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: 
Registered executor NettyRpcEndpointRef(null) (10.0.2.15:37588) with ID 2

16/11/27 03:07:26 INFO storage.BlockManagerMasterEndpoint: Registering 
block manager hadoop-VirtualBox:34754 with 117.0 MB RAM, BlockManagerId(3, 
hadoop-VirtualBox, 34754)

16/11/27 03:07:26 INFO storage.BlockManagerMasterEndpoint: Registering 
block manager hadoop-VirtualBox:44869 with 117.0 MB RAM, BlockManagerId(1, 
hadoop-VirtualBox, 44869)

16/11/27 03:07:50 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on hadoop-VirtualBox:39299 (size: 1169.0 B, free: 117.0 MB)

16/11/27 03:07:57 INFO scheduler.TaskSetManager: Finished task 0.0 in 
stage 0.0 (TID 0) in 35264 ms on hadoop-VirtualBox (1/2)

16/11/27 03:07:58 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at 
SparkPi.scala:38) finished in 116.177 s

16/11/27 03:07:58 INFO scheduler.TaskSetManager: Finished task 1.0 in 
stage 0.0 (TID 1) in 33636 ms on hadoop-VirtualBox (2/2)

16/11/27 03:07:58 INFO cluster.YarnScheduler: Removed TaskSet 0.0, 
whose tasks have all completed, from pool

16/11/27 03:07:58 INFO scheduler.DAGScheduler: Job 0 finished: reduce 
at SparkPi.scala:38, took 135.053954 s

Pi is roughly 3.1430557152785763

16/11/27 03:07:58 INFO server.ServerConnector: Stopped 
ServerConnector@31a8e0fa{HTTP/1.1}{0.0.0.0:4040}

16/11/27 03:07:58 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@6c6357f9{/stages/stage/kill,null,UNAVAILABLE}

16/11/27 03:07:59 INFO ui.SparkUI: Stopped Spark web UI at http://10.0.2.15:4040

16/11/27 03:08:00 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor 
thread

16/11/27 03:08:01 ERROR scheduler.LiveListenerBus: SparkListenerBus has already 
stopped! Dropping event SparkListenerExecutorMetricsUpdate(4,WrappedArray())

16/11/27 03:08:02 ERROR scheduler.LiveListenerBus: SparkListenerBus has already 
stopped! Dropping event SparkListenerExecutorMetricsUpdate(2,WrappedArray())

16/11/27 03:08:03 INFO cluster.YarnClientSchedulerBackend: Shutting down all 
executors

16/11/27 03:08:03 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking
 each executor to shut down

16/11/27 03:08:04 INFO cluster.SchedulerExtensionServices: Stopping Scheduler
ExtensionServices

(serviceOption=None,

 services=List(),

 started=false)

16/11/27 03:08:05 INFO cluster.YarnClientSchedulerBackend: Stopped

16/11/27 03:08:05 INFO spark.MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!

16/11/27 03:08:05 INFO memory.MemoryStore: MemoryStore cleared

16/11/27 03:08:05 INFO storage.BlockManager: BlockManager stopped

16/11/27 03:08:05 INFO storage.BlockManagerMaster: BlockManagerMaster stopped

16/11/27 03:08:05 INFO scheduler.OutputCommitCoordinator$OutputCommit
CoordinatorEndpoint: OutputCommitCoordinator stopped!

16/11/27 03:08:05 INFO spark.SparkContext: Successfully stopped SparkContext

16/11/27 03:08:05 INFO util.ShutdownHookManager: Shutdown hook called

16/11/27 03:08:05 INFO util.ShutdownHookManager: Deleting directory /
tmp/spark-13698b28-7a4f-46b9-b322-12e29f4cd6b4

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$

The client mode goes through same stages as cluster mode. But here, you get the output printed on the terminal itself.

We successfully ran Spark application on YARN!

Now lets move ahead and see how Spark and Hive work together.

How to integrate spark with Hive

Spark SQL is a part of Spark ecosystem which supports Apache Hive as well. By default Hive dependencies are not present in Spark but if Hive dependencies can be found on the classpath, Spark will load them automatically. Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf directory of Hive.

I have a sample input file which has multiple records containing employee data. I will use this data for Spark-Hive integration. Once you are done with this, go to spark directory and start spark-shell.

input_spark_hive

In this example, I will create a HiveContext and execute Hive queries. The same will get reflected in Apache Hive as well.

hadoop@hadoop-VirtualBox:~$ cd spark-2.0.2-bin-hadoop2.7/

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$ ./bin/spark-shell

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel).

16/11/27 01:58:06 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

16/11/27 01:58:08 WARN util.Utils: Your hostname, hadoop-VirtualBox resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)

16/11/27 01:58:08 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address

16/11/27 01:58:17 WARN spark.SparkContext: Use an existing SparkContext, 
some configuration may not take effect.

Spark context Web UI available at http://10.0.2.15:4040

Spark context available as 'sc' (master = local[*], app id = local-1480192095899).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2

      /_/


Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

Create a HiveContext to run Hive SQL-like queries inside Spark.

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.
hive.HiveContext@538b3c88

Create a table in Spark SQL

scala> sqlContext.sql("CREATE TABLE employee1 (name STRING, id INT, salary
 INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")

res1: org.apache.spark.sql.DataFrame = []

Load data in the table.

scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/input' INTO 
TABLE employee1")

res4: org.apache.spark.sql.DataFrame = []

Run an Spark SQL query to find the employees having salary less than 70000 USD.

scala> sqlContext.sql("SELECT * FROM employee1 WHERE salary < 70000").collect()
.foreach(println)

[John,1,40000]

[Debby,3,55000]

[Lauren,4,50000]

[Luis,6,65000]

[Dier,8,67000]

[Ricky,10,48000]

scala>

Run a Spark SQL query to find out the average salary.

scala> sqlContext.sql("SELECT AVG(salary) FROM employee1").collect().
foreach(println)

[68500.0]

scala>

Press ctrl +c and come out from scala shell.

Now start hive and check if the employee1 table has come in Hive also and it is containing the same data.

hadoop@hadoop-VirtualBox:~/spark-2.0.2-bin-hadoop2.7$ cd

hadoop@hadoop-VirtualBox:~$ hive

Logging initialized using configuration in jar:file:/home/hadoop/apache-hive-2.1.0-bin/lib/hive-common-2.1.0.jar!/hive-log4j2.properties Async: true

Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

hive> show databases;

OK

default

test

Time taken: 6.513 seconds, Fetched: 2 row(s)

hive> show tables;

OK

employee

employee1

Time taken: 0.695 seconds, Fetched: 2 row(s)

hive> select * from employee1;

OK

John       1             40000

Ellen      2             75000

Debby    3             55000

Lauren   4             50000

Dennis   5             90000

Luis        6             65000

Derrek   7             98000

Dier        8             67000

Ally         9             97000

Ricky      10           48000

Time taken: 10.607 seconds, Fetched: 10 row(s)

hive>

Conclusion

You can see the table we created in spark and the data we loaded in it, the same is present in Hive as well. We have successfully integrated with Spark and Hive. This way by integrating Spark with YARN and Hive, you can leverage both batch processing and real-time processing together. So now you know how to deploy your Spark application over YARN cluster in client mode as well as cluster mode.

Filed Under : LINUX HOWTO

Tagged With : , ,

Free Linux Ebook to Download

Leave a Reply

Commenting Policy:
Promotion of your products ? Comment gets deleted.
All comments are subject to moderation.