Warehouse architects, data engineers and spark developers with an intermediate level of experience who want to improve performance of data processing.
Data Distribution in Big Data
The performance of the Big Data systems is directly linked to the uniform distribution of the processing data across all of the workers. When you have a database table and then take the data from it to processing, the rows of the data should be distributed uniformly among all the data workers. If some data slices have more rows than others, the workers with more data should work harder, longer, and need more resources and time to complete their jobs. These data slices and the workers that manage them become a performance bottleneck for whole data processing task. Uneven distribution of data is called skew and an optimal data distribution has no skew.
Where is a Problem With Data Distribution?
What is data skew
Data skew means that data distribution is uneven or asymmetric. Symmetry means that one half of the distribution is a mirror image of the other half.
Skewed distribution may be different types:
left skewed distribution - has a long left tail. Left-skewed distributions are also called negatively-skewed distributions.
right skewed distribution - has a long right tail. Right-skewed distributions are also called positive-skew distributions.
Another asymmetric distribution examples: link
How to Find The Skew Problem in Your Data?
Skewness is a measure of symmetry, or more precisely, the lack of symmetry. A distribution, or data set, is symmetric if it looks the same to the left and right of the center point. Skewness for the normal distribution equal zero. This means that the median = mean = moda. Skewness calculates via the formula:
Data Skew in Distributed Systems
MapReduce is a parallel, distributed computation paradigm. MapReduce allows scaling computations across hundreds and thousands of nodes. This paradigm provides two main operations - map and reduce. The mapping step takes a set of data and converts it into another set of data, where individual elements are broken down into tuples - key/value pairs. The reduce step takes the output from a map and combines those data tuples into a smaller set of tuples. The reduce job is always started after the map job.
In distributed, parallel computation systems like Hadoop MapReduce or Apache Spark, uneven data distribution may cause a longer computation time than in the case of symmetric, balanced data distribution.
For example, there are two tables for our e-store:
customers(customer_id, name, …) orders(order_id, customer_id, article, total, …)
And we want to join those two tables to get a full orders list for each customer.
It should be noted that some of the customers are very active (regular customers) and some of them are very passive (purchases are not frequent). For this example, we can say that skewed data is data from orders table and unskewed data is data from customers table. Processing the data via Hadoop MapReduce or Apache Spark leads to a long time task execution.
There is a chart from Spark UI that describes possible computation time results for tasks that work with skewed data:
From the chart, is visible that all tasks take varying amounts of time to complete, which is to be expected, but the variation is quite large .
How to Solve Your Data Distribution Problem?
General Solutions of Data Skew Problem
There are several ways to solve data skew problem. First, we will look at a few common ways of solving, and then consider the solutions provided by various additional services. One of the ideas of solving data skew is splitting a calculation data for a larger number of processors. Also, we can set more partitions for overcrowded columns to reduce access time to data. Below you can see two common solutions for data skew problem at different system layers.
We can reduce data skew effect at the data uploading stage. The main idea is to clearly point to the skewed data (key) before their partitioning. This will allow the data to be distributed in a different way, which consider a data unevenness. As result, it will reduce the impact of data skew before calculations begin. Because we do not do additional actions during the calculation phase, it reduces the execution time.
can be implemented before processing phase
increasing the speed of computing
By default, all values for a particular key goes to the same mapper. If we see that some key have overcrowded values quantity (order_id), then we can divide it into more than one mapper .
can be implemented between processing phases
number of mappers equals to number of partitions
custom partition strategy could be set
Out of The Box Solutions of Data Skew Problem
Now we will consider ready-made solutions from popular services. We describe data skew solution for two Apache services - Hive and Pig. We also look at the solution for Apache Spark framework.
Apache Hive is a data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. A structure can be projected onto data which are already in the storage.
Skewed table (Hive)
Values that appear very often (heavy skew) are split out into separate files and rest of the values go to some other file . In this case, the skewed table is the orders table.
CREATE TABLE SkewData (c1 STRING, c2 STRING) SKEWED BY (c1) ON ('value');
automatic splitting into files
skipping/including whole file if possible
List bucketing (Hive)
The special type of skewed table. Main idea - maintaining one directory per skewed key. The data corresponding to non-skewed keys goes into separate directory .
CREATE TABLE SkewData (c1 STRING, c2 STRING)SKEWED BY (c1) ON ('value') STORED AS DIRECTORIES;
directory per key
for small skewed keys quantities
Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs.
Skewed join (Pig)
Pig’s skewed join can be used when the underlying data is sufficiently skewed and the user needs a finer control over the allocation of reducers to counteract the skew. It should also be used when the data associated with a given key is too large to fit in memory.
big = LOAD 'big_data' AS (b1,b2,b3);massive = LOAD 'massive_data' AS (m1,m2,m3);C = JOIN big BY b1, massive BY m1 USING "skewed";
join tables whose keys are too big to fit in memory
do not support more than two tables for skewed join
Apache Spark is an open-source distributed general-purpose cluster-computing framework. Has as its architectural foundation the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way.
Skewed Join (Spark)
Spark Framework also allows you to use skewed join as well as Hive or Pig solutions. Let's look at an example using Spark SQL. To specify a skewed table, we have to use hints for our SQL queries.
SELECT /*+ SKEW('orders') */ *FROM orders AS o, customers AS cWHERE o.customer_id = c.customer_id;
Data replication (Spark)
In common case, if we need to join unskewed table to skewed table (and vice versa), we can replicate unskewed table data N times and add new key which represent a value between 0 and N. For skewed table data we add to existing key a random, uniformly distributed value between 0 and N. We are simply splitting the keys so that values associated with the same original key are now split into N buckets .
Joining with data replication:
According to the example above (e-store), unskewed data may be data from the customer’s table and skewed data is data from the orders table. We replicate data from customers table N times and add the new, corresponding key for each replica. Skewed, orders data we don’t need to replicate, but we add a random value between 0 and N to key value. After the task, we can remove this random integer and get the original keys if we need it.
For replicating a small, unskewed dataset, we can use a broadcast hints from Spark SQL or a broadcast function:
SELECT /*+ BROADCASTJOIN(customers) */ o.order_id, c.customer_idFROM orders AS oJOIN customers AS c ON o.customer_id = c.customer_id;
val smallHinted = small.hint("broadcast")val plan = smallHinted.queryExecution.logical
Experiments provided on 6 nodes Hadoop cluster 15Gb of operational memory and 3 cores each and Spark 2.3 on it.
We had two datasets, one large and one small, both datasets contains skewed data. Datasets were stored on HDFS.
Our task was to join two datasets on one or more keys which are skewed by default. And our goal was to decrease job execution time by effectively utilizing all available cluster resources.
Run spark job with classical map-reduce data distribution gave as results like this.
Total time to complete a job - 12 min
As we can see, we have huge data skew on one of the keys (executor 2). As the result, the total duration of the job depends completely from one executor time and parallel execution practically has no effect.
After union repartition of large dataset between all nodes and broadcasting small dataset to each node, we achieve a much better picture.
Total time to complete a job - 2.7 min
As we can see, now all six executors it calculation process, as the result, we achieve significant effect from parallel execution.
Next step was maximization of utilization of cluster resources. For this purpose we slightly tuned spark-submit command to use more than one core per executor.
Total time to complete a job - less a 1 min
As a result, all executors has own parallelism level which is determined by the available number of cores and could faster complete task.
Project sources on github: link
Data skew is a real problem for effective parallel computing. The reason of this problem may be a poor data structure or data nature. If we refer to an example with e-store, the problem is in the nature of data. There is no guarantee that all customers will have an even distribution of purchases. Data skew in cases like this is inevitable, so it is possible only to compensate the consequences of this problem, but not eliminate it. As the practical experiment shows, the consequences of data skew are very unpleasant. In fact, eliminating parallelism leads to more execution time and, thankfully, we have a few tools to overcome it.