Make Your Data Processing Code Fly in 5 Minutes

Unleash the full power of your computer within 10 lines of code

Zixuan Zhang
Towards Data Science

--

Although data scientist is titled the ‘sexiest’ job of the 21st century, data processing, an important part of the daily work for data practitioners, is laborious and often not so fun. In the era of big data, data processing is even more time-consuming, easily taking tens of hours to complete. This is both a frustration and it hampers project progress. Therefore, data practitioners are willing to write complex code just for a tiny bit of performance improvement. However, speeding up data processing isn’t that hard! In this article, I’ll introduce some easy and intuitive ways to cut down the runtime of several common data processing works.

Give it everything you’ve got

First of all, modern CPUs are lightning fast. Popular CPUs like Intel i7 series can easily achieve 3 GHz clock and have at least 4 cores, which means they should be able to carry out most data processing tasks in a reasonable amount of time. However, CPUs are often starved, meaning the code is bounded by other factors such as I/O delays (disk to RAM, RAM to Cache, etc). Therefore, reducing I/O delays is crucial.

from David Jeppesen’s ‘Computer Latency at a Human Scale’

As you can see, reading data from HDD (rotational disk) is rather slow. Therefore, you can improve its speed just by moving the data read/write folder to an SSD if your task is I/O-bound.

Secondly, if the operation is CPU-bound, we’ll have to dig a little bit deeper into the regime of parallelism. Parallelism can utilize the full potential of the CPU, making every penny you paid worthwhile. Python offers a variety of parallel computing libraries, but most of them, unfortunately, require large chunks of additional setup code and a decent understanding of threads/processes/synchronization/ etc. Dask is a library that provides parallel computation without exposing users to the nitty-gritty details of parallel configuration. I’ll show you how easy to speed up your code with the help of Dask.

System configuration

Dask is installed by default in Anaconda Python environment. If you do not have dask installed, just type this line of code in your terminal

pip3 install dask 

Set up a local cluster (each core is a worker in the cluster)

# in python
from dask.distributed import Client
client = Client(scheduler = 'threads') # set up a local cluster
client # prints out the url to dask dashboard, which can be helpful

That’s it! You now have several workers at your disposal.

Common tasks that can be easily accelerated

a. File format conversion

Example 1: convert JSON files to CSVs

Converting JSON to CSV is a common task in web crawling. A common way to process JSON files is this:

file_list = os.listdir(base_dir)
json_list = [file for file in file_list if file.endswith('.json')]
for file in json_list:
inp = os.path.join(base_dir, file)
out = os.path.join(base_dir, new_name)
json_process(inp, out) # convert each json to a csv

However, this serial method does not utilize the full potential of your machine. We can write a better code with minor modifications:

file_list = os.listdir(base_dir)
json_list = [file for file in file_list if file.endswith('.json')]
parallel_work = []
for file in json_list:
inp = os.path.join(base_dir, file)
out = os.path.join(base_dir, new_name)
parallel_work.append(dask.delayed(json_process)(inp, out))# lazy
dask.compute(*parallel_work) # make dask compute

dask.delayed() is a lazy signal, which means that it will not perform the function (json_process in this case) unless being specifically told to. The advantage of using delayed() is that the system will intelligently determine the parallelizable part.

For instance, dask can identify the underlying task dependency in the following operation and then work on as many operations at the same time as possible (4 tasks in this case)

def add(x,y):
return x+y
def minus(x,y):
return x-y;
x = 10
y = 1
result = dask.delayed(add)(x,y) * dask.delayed(minus)(x,y) + dask.delayed(minus)(x,y)/dask.delayed(add)(x,y)
result.visualize()# show task dependency
task dependency graph generated by dask

Benchmark: Process JSON files into dataframe in CSV format.

you can get the max speedup = # of cores

Example 2: merge CSVs into one large CSV

Another major hassle for data practitioners is merging CSVs. Here’s a code I found online:

#code 1
df = pd.read_csv(file)
for file in csv_list:
a = pd.read_csv(os.path.join(base_dir, file))
df = pd.concat([df,a])

This code will give a bad performance, as pd.concat() is an expensive function. pd.concat(df1,df2) reallocates a new memory space and copies both dataframes to the new dataframe. The code above gives you quadratic runtime.

runtime analysis

A better solution:

#code 2
df_list= []
for file in csv_list[1:]:
df= pd.read_csv(os.path.join(base_dir, file))
df_list.append(df)
df = pd.concat(my_list)

This code only copies dataframes once and thus yields linear runtime. We can’t improve this code significantly, but we can improve it sublinearly using dask.

#code 3 with dask
df_list =[]
for file in csv_list:
df = dask.delayed(pd.read_csv)(os.path.join(base_dir, file))
df_list.append(df)
df = dask.delayed(pd.concat)(df_list).compute)

Benchmark

b. Data aggregation

Dask provides several data structures and dask.dataframe is one of them. Dask.dataframe allows users to break one huge dataframe into chunks, which allows collaboration between cores. To create dask.dataframe, just do:

from dask import dataframe as dd
dd_df = dd.from_pandas(df, npartitions=6)
dd_df.visualize()
dask dataframe visualization

Dask.dataframe object highly resembles pandas dataframe (many commands are exactly the same). For example,

# in pandas
df.groupby(['Feature1']).sum()
df['Feature1'].min()
df['Feature1'].rolling(10).mean()
# in dask
dd_df.groupby('Feature1').sum().compute()
dd_df['Feature1'].min().compute()
dd_df['Feature1'].rolling(10).mean().compute()

Benchmark: groupby()

One important thing: dataframe partitioning is not random work; it is a good idea to partition your data according to the number of cores you have. Too many partitions will unavoidably increase the communication overhead.

c. Feature engineering

Feature engineering is one crucial prerequisite of a good model. It is always a good idea to vectorize your code when possible. For example:

%timeit df['Feature3'] = df['Feature1'] **2
#47.3 ms ± 982 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit df['Feature3'] = df['Feature1'].apply(transformsform)
#2.62 s ± 47.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Pandas is based on numpy library, a high performance computation library, therefore any vectorized operation is stunningly fast. Luckily, we get the same API in dask:

dd_df['F1'] = dd_df['F1']**2 + dd_df['F2'] (lazy)

However, not all work can be vectorized. For instance: If your dataframe looks like this and you wish to extract the first names, you have to use apply().

#you can't vectorize this
def operation(x):
return x.split(':')[0]

Since the vectorized operations are generally fast enough, I’ll focus on un-vectorizable operations.

Example 1: column transformation

def operation(x):
#some un-vectorizable operations with process time M
return y
df['new'] = df.F1.apply(operation) # pandas
ddf['new'] = ddf.F1.apply(operation).compute(scheduler ='processes') # dask

I set the scheduler to ‘processes’ because this code is purely dominated by operation(). It is reasonable in this case to send data to other processes in exchange for true parallelism (each process works on a partition). See the explanation provided by dask:

Quote ‘ The threaded scheduler … is lightweight …. It introduces very little task overhead (around 50us per task) …. However, due to Python’s Global Interpreter Lock (GIL), this scheduler only provides parallelism when your computation is dominated by non-Python code.’

‘The multiprocessing scheduler …. Every task and all of its dependencies are shipped to a local process, executed, and then their result is shipped back to the main process. This means that it is able to bypass issues with the GIL and provide parallelism even on computations that are dominated by pure Python code’

Benchmark:

Moving data around is costly, which is why using processes scheduler is helpful only when the operation you wish to conduct is way more expensive than inter-process communication:

both n and theta play roles

As you can see, large n and large theta can make the denominator smaller and thus makes the division greater, which is why we want large n and theta.

operation time and the length of dataframes both matter

You’ll want to go as close to the top right corner as possible. On the other hand, if your operation does not take that long or your dataframe is small, you might just use pandas.

Example 2: multiple feature engineering

If you have multiple columns that need processing, you can work on those columns at the same time:

#pandas
operation1(df.F1)
operation2(df.F2)
operation3(df.F3)
#dask
a = dask.delayed(operation1)(df.F1)
b = dask.delayed(operation2)(df.F2)
c = dask.delayed(operation3)(df.F3)
work = [a,b,c]
result = dask.compute(*work)

Benchmark: same as the previous example, both operation time and data length play important roles.

Summary

Data processing can be time-consuming, especially when done by a single-threader. I have illustrated how easy it is to utilize the full power of your computer with dask. There are, of course, other multiprocessing/threading libraries, but I think dask is the easiest and quickest tool to help data practitioners get started.

Now go speed up your code!

Related materials

Dask website: https://docs.dask.org/en/latest/why.html

Dask tutorial in depth: https://github.com/dask/dask

Dask Overview: https://www.youtube.com/watch?v=ods97a5Pzw0

--

--

New grad SDE at some random company. Student at Columbia & USC. I know how hard learning CS outside the classroom can be, so I hope my blog can help!