Troubleshooting Hive query performance in HDInsight Hadoop cluster


One of the common support requests we get from customers using Apache Hive is –my Hive query is running slow and I would like the job/query to complete much faster – or in more quantifiable terms, my Hive query is taking 8 hours to complete and my SLA is 2 hours. Improving or tuning hive query performance is a huge area. Depending on the data size and type and complexity of the query we may have to look from different perspectives and play with many different configuration settings in yarn, MapReduce, hive, Tez etc. To discuss all those options in details, it will definitely take much more than a blog. So today in this blog I will actually discuss the steps we followed to troubleshoot a hive query performance issue that I have worked on recently and I hope it will give you some pointes on how to troubleshoot performance or slowness of a hive query. I will talk about the logs we captured, the questions we tried to answer from the logs and how that led us to the right cluster size and configurations to reduce the execution time significantly.

What was the issue?

This customer was running a Hive query and the query was taking three and half hours but the customer’s SLA expectation was within an hour or so. To reduce the execution time they scaled up their 60 node HDInsight cluster data node size from A3 (7 GB RAM and 4 Core) to A6 (28 GB RAM and 4 Core) but kept the number of nodes the same. However, running the query in the new A6 data node cluster didn’t yield any performance gain.

 

What logs did we capture?

To troubleshoot the issue we collected the following logs/files after they run the job on a 60 A6 data node cluster.

  1. Hive query and explain extended for the hive query.
  2. Hive log after running the Hive query in DEBUG mode.

    To switch to DEBUG mode we had the customer change the following line in “C:\hdp\hive¬*\conf\log4j.properties” file:

    From: hive.root.logger=INFO,DRFA

    To: hive.root.logger=ALL,DRFA

  3. Yarn log by runing the following command from Hadoop command line with the application ID to capture the yarn log.

    yarn logs -applicationId <ApplicationId> -appOwner <cluster_user_id> > <path>\Yarnlogs.txt

  4. job_jobid_conf.xml file(s) from the WASB folder ‘mapred/history/done/<year>/<month>/<execution date of the failed job>/
  5. Configuration files hive-site.xml, mapered-site.xml, yarn-site.xml

Where did the job spend the most of the time and was there any bottle neck?

First we wanted to find out where the job spent most of the time and if there was any bottle neck. We checked the hive query and explain extended output of the hive query. The query was very straight forward. They were inserting JSON data from one hive table to another as ORC format (the data was getting converted in the process). They had about 5000 files of different sizes, from few hundred KB to few MBs, with a total size of 400 GB. In explain extended output we didn’t see anything that standout. Then we checked the hive log, same thing nothing unusual. Next step yarn log, from the yarn log we found there were 677 splits, so the job had 667 tasks.

2015-07-21 17:14:35,709 INFO [InputInitializer [Map 1] #0] tez.HiveSplitGenerator: Number of grouped splits: 677

The job was launched at around 2015-07-21 17:13:05. We checked few tasks to see the time taken by these tasks and when they were scheduled. Below is from the yarn log for one of the first tasks:

Container: container_1437433182798_0013_01_000010

LogType:syslog_attempt_1437433182798_0013_1_00_000001_0

Log Upload Time:Tue Jul 21 20:24:59 +0000 2015

LogLength:34576

Log Contents:

2015-07-21 17:15:07,159 INFO [main] task.TezChild: Refreshing UGI since Credentials have changed

2015-07-21 17:15:07,159 INFO [main] task.TezChild: Credentials : #Tokens=2, #SecretKeys=1

……

2015-07-21 17:37:30,172 INFO [TezChild] task.TezTaskRunner: Task completed, taskAttemptId=attempt_1437433182798_0013_1_00_000001_0, fatalErrorOccurred=false

This task took about 22 minutes (17:37:30 – 17:15:07) to compete. We saw at around 20:19, App Master got the completion report of all tasks.

2015-07-21 20:19:28,372 INFO [AsyncDispatcher event handler] impl.VertexImpl: Num completed Tasks for vertex_1437433182798_0013_1_00 [Map 1] : 677

The job finally completed at around 2015-07-21 20:41:47. Let’s line up the time stamps to better understand where did the job spend most of the time.

Job started                                           17:13:05

Frist task launched at around               17:15:07

All tasks completed at around              20:19:28

The job finally completed at around     20:41:47

  • From start to end the job took about 3 hours and 28 minutes (20:41:47 – 17:13:05) to complete.
  • Before the 1st task launched the driver program took only 2 minutes (17:15:07- 17:13:05)
  • The Task Execution Phase took about 3 hours 4 minutes (20:19:28 – 17:15:07) to complete all tasks.
  • Finally time between the all tasks completion and job completion is about 22 minutes (20:41:47 – 20:19:28).

It is clear that the job spent most of the time in the Task Execution Phase, about 3 hour 4 minutes and before and after the Task Execution Phase the job spent only (2 + 22) 24 minutes. Clearly the driver program was not the bottle neck. Therefore to improve performance we need to find out how we can reduce the execution time in the Task Execution Phase.

Why did the Task Execution Phase take so long?

Next we wanted to find out why the Task Execution phase took that long. Yarn log showed the tasks that were executed later were waiting for containers to become available. We checked one of the last tasks and found it was launched more than two hours later from first task (at 17:15:07 shown above).

LogType:syslog_attempt_1437433182798_0013_1_00_000676_0

Log Upload Time:Tue Jul 21 20:24:59 +0000 2015

LogLength:35060

Log Contents:

2015-07-21 19:37:14,225 INFO [main] task.TezChild: Refreshing UGI since Credentials have changed

2015-07-21 19:37:14,225 INFO [main] task.TezChild: Credentials : #Tokens=2, #SecretKeys=1

2015-07-21 19:37:14,225 INFO [main] task.TezChild: Localizing additional local resources for Task : {}

…..

2015-07-21 20:06:31,528 INFO [TezChild] task.TezTaskRunner: Task completed, taskAttemptId=attempt_1437433182798_0013_1_00_000676_0, fatalErrorOccurred=false

This task took 29 minutes to complete. AppMaster log showed the delay as well – the task had to wait to find available resources to run the task on a container.

2015-07-21 17:14:37,881 INFO [AsyncDispatcher event handler] impl.TaskAttemptImpl: attempt_1437433182798_0013_1_00_000676_0 TaskAttempt Transitioned from NEW to START_WAIT due to event TA_SCHEDULE

2015-07-21 19:37:10,863 INFO [DelayedContainerManager] rm.YarnTaskSchedulerService: Assigning container to task, container=Container: [ContainerId: container_1437433182798_0013_01_000626, NodeId:

workernode0.asurionprodhdc.b8.internal.cloudapp.net:45454, NodeHttpAddress: workernode0.asurionprodhdc.b8.internal.cloudapp.net:30060, Resource: <memory:9216, vCores:1>, Priority: 2, Token: Token { kind:

ContainerToken, service: 100.72.182.58:45454 }, ], task=attempt_1437433182798_0013_1_00_000676_0, containerHost=workernode0.asurionprodhdc.b8.internal.cloudapp.net, localityMatchType=NonLocal,

matchedLocation=*, honorLocalityFlags=false, reusedContainer=false, delayedContainers=0, containerResourceMemory=9216, containerResourceVCores=1

2015-07-21 19:37:14,252 INFO [AsyncDispatcher event handler] impl.TaskAttemptImpl: attempt_1437433182798_0013_1_00_000676_0 TaskAttempt Transitioned from START_WAIT to RUNNING due to event TA_STARTED_REMOTELY

So there wasn’t enough containers to run all 667 tasks at once and rather tasks were executed in multiple waves. To get some idea about how many containers were available we checked the MR memory settings in the configuration files which determine the container size and number.

In mapred-site.xml

<name>mapreduce.map.memory.mb</name>

<value>9216</value>

<name>mapreduce.reduce.memory.mb</name>

<value>9216</value>

So based on the above settings both map, reduce container size is set to 9 GB.

In yarn-site.xml

<name>yarn.nodemanager.resource.memory-mb</name>

<value>18432</value>

Yarn-site.xml property yarn.nodemanager.resource.memory-mb defines the total amount of memory the Node-manager can use for containers. So with the above settings:

Container size for both map and reduce task= 9 GB

Total amount of memory node manager can use for containers = 18 GB

Number of containers per node  18/9 = 2 Containers

Total number of containers in a 60 node cluster 60*2 = 120 Containers

Earlier we saw there were about 667 tasks so with a total of 120 containers we will need about 667/120 ~ 6 waves of task execution to complete the job. The two tasks we checked so far took 22 and 29 minutes respectively. We checked few more and the time taken was about the same, so we didn’t see any evidence of data skew. If we assume 30 minutes for each wave of tasks to complete then with 6 waves we would need about 3 hours which matches the time taken in the Task Execution Phase we have calculated earlier from the yarn log, 3 hours 4 minutes (20:19:28 – 17:15:07).

This also explains why the customer didn’t see any improvement after scaling up from A3 to A6 data node. In A3 we have only 7 GB memory but the MR memory settings were also set to much smaller value thus moving to A6 didn’t actually increase the number of containers.

How can we improve the performance of the Task Execution Phase of a Hive job?

Now to improve the performance of the Task Execution Phase of a Hive job we need to reduce the number of task execution waves by increasing the number of available containers. We had each container configured to have 9 GB RAM. Usually containers doesn’t need to have more memory for big data size rather may need more memory based on the query plan when you have complex query and 2 GB container is good in most cases. We already know that the query is not complex and we didn’t see anything unusual in the hive log or explain extended output. We considered the following two possible options of increasing the number of containers.

  1. Scale up the cluster by increasing the available memory of each data node. Customer already did that by moving the data nodes from A3 (7 GB RAM and 4 core) to A6 (28 GB RAM and 4 core). But the container sizes were set too high and this is why they didn’t see any increase in the number of available containers. We can just set the container size to a smaller value, for example to 2 GB, instead of 9 GB and that should increase the number of containers significantly.
  2. Scale out the cluster by increasing the number of data nodes. For example if we increase the data nodes from 60 A3 to 120 A3 that would double the available containers.

To implement option #1 we can just set the mapreduce.map.memory.mb and mapreduce.reduce.memory.mb 2 GB (also the corresponding java.ops settings for heap to a smaller value) and that should increase the number of containers by more than 4 times. However, with this change each node will have 18/2=9 containers. Notice data node size A6 has 4 times more memory than data node size A3 but both has 4 cores. So if we have 9 containers in each node they will have to share the 4 available cores in the node. Which is more than 2 containers per core and this would not give us the best balance of resource utilization.

On the other hand to implement option #2 we need to go back to the pervious smaller data node size A3 ( 7 GB RAM and 4 core) and then scale out the cluster to 120 nodes instead of 60 nodes. This will double the amount of memory and at the same time will also double the available cores. With smaller data node we can try to set the container size to an even smaller value for example 1 GB since we have 4 cores available in each node. Just want to mention here in HDInsight you can quickly delete and recreate a Hadoop cluster. So recreating a cluster with different data node size and number is not a problem.

Which option would be better for our scenario?

For our issue smaller containers in A3 data node made more sense based on the data size and the query complexity. With A3 we will also be able to make sure most of the containers will have a separate core even with smaller container size. So we decide to scale out the cluster from 60 A3 data nodes to 120 A3 data nodes and set the container size to 1 GB. In A3 data node we have a total of 7 GB RAM and 4 cores. By default yarn.nodemanager.resource.memory-mb is set to 5.5 GB so we should have 5 containers in each node.

Usually Hive queries run faster using Tez engine compared to MR. So we wanted to recommend the customer to use Tez engine. But then we found out customer was already using Tez by setting hive.execution.engine=tez. When using Tez for Hive the container sizes are determined by Tez task’s memory setting hive.tez.container.size instated MR memory settings (mapreduce.map.memory.mb and mapreduce.reduce.memory.mb). From the hive-site.xml file we found that the hive.tez.container.size was set to 9 GB, same as the MR memory settings. So the calculations we did above to find out the available containers based on the MR memory settings is still valid.

In hive-site.xml

<name>hive.tez.container.size</name>

<value>9216</value>

Final recommendation and the results:

We recommended the customer to create a new HDInsight Hadoop cluster with 120 A3 data nodes and also add the following set command in the Hive query:

set hive.tez.container.size=1024;

set hive.tez.java.opts =”-Xmx819m”;

Customer ran the same Hive query using the same data set and this time it took only 1 hour and 16 minutes. The Task Execution Phase took about an hour. So with the recommended cluster size and configuration changes we were able to reduce the execution time to one third of the previous execution time.

Conclusion

The specific changes we made in this case to improve the performance of the hive query may not be applicable to improve the performance of the hive quire that you are struggling with. For example in this case the number of tasks were more than the number of available containers, so to improve the performance we increased the number of available containers. However, in some other cases you may see just the opposite – the number of tasks is much smaller comparted to the number of available containers. In those situations we need to increase the number of tasks to improve the performance and you can set mapreduce.input.fileinputformat.split.maxsize to a smaller value to increase the number of splits and thus increase the number tasks. The split size is calculated by the formula:

max(minimumSize, min(maximumSize, blockSize))

By default minimumSize < blockSize < maximumSize, so by default split size is blockSize and in HDInsight default value for fs.azure.block.size is 512 MB.

I hope this blog has cleared up some of the basic concepts or guiding principles that you need to follow to improve the performance of a hive query in a Hadoop cluster especially in HDInsight. Hopefully by following the same approach you should also be able to find the right cluster size and configurations to optimize the performance of your hive query.

Comments (0)

Skip to main content