Recently I ran into a tricky question which seems to a fairly easy one at the first glance. It is a typical data processing related issue including duplication check, sorting .etc In this blog, I’m going to demonstrate the issue and mark down the process of how to spot the bottleneck to level up the program. Again, any optimization apart from the real issue backed is nonsense. Therefore this is just a brief or general optimization idea for similer related issues.
Stop using iteration
Now let’s compare this to the time required to explicitly loop over the array in Python and tally up the sum. Python is unable to take advantage of the fact that the array’s contents are all of a single data type - it has to check, for every iteration, if it is dealing with an integer, a string, a floating point number, etc, just as it does when iterating over a list. This will slow down the computation massively.
The above description is from this blog. Actually until this time of struggling to improve the code efficiency did I realize how important it is to use vectorized operation. I used to think it is another way of for loop, but obviously not.
My scenario was that I had an HDF5 file which contains 2 datasets(Feel free to check the related concept in HDF5 official page, but essentially could think of it as a set of numpy array with schema). One is my actual data containing timestamp data and the other is the index which keep track of the index of each timestamp.
Dataset1
ts | v1 | v2 |
---|---|---|
2020-01-01 | … | … |
2020-02-01 | … | … |
2020-02 05 | … | … |
Dataset2
ts | index |
---|---|
2020-01-01 | 0 |
2020-02-01 | 1 |
2020-02 05 | 2 |
Timestamp in the first dataset is ordered by ts. So I need to delete one element in the data which means the index dataset should be changed accordingly.
Here is my original operation over my dataset:
data_dtype = fp[_type].dtype
idx_dtype = fp['index'].dtype
qualified_entries = []
index_shift = 0
for entry in fp[_type]:
ts = entry['timestamp']
if end_ts and start_ts <= ts <= end_ts:
qualified_entries.append(entry)
if not end_ts and start_ts <= ts:
qualified_entries.append(entry)
else:
index_shift += 1
new_name = "new_{}".format('-'.join(fp.filename.split('/')))
new_data = np.array(qualified_entries, dtype=data_dtype)
print("Processing index {}...".format(new_name))
idx = []
for x in fp['index']:
temp_idx = x['index'] - index_shift
if temp_idx >= 0:
idx.append((x['timestamp'], temp_idx))
new_idx = np.array(idx, dtype=idx_dtype)
There are basically 2 places that could be optimized which both lead to the same fundamental issue: avoid using for loop in numpy array. As discussed before, numpy array is a very special data structure optimized for fast calculation in very low level such as address finding.
def ts_filter(fp, _type, start_ts=1585699200000, end_ts=None):
"""
a function to filter qualified entries
:param _type:
:param fp:
:param start_ts:
:param end_ts:
:return:
"""
algo = 'gzip'
level = 9
idx_ds = fp['index']
data_ds = fp[_type]
# qualified_entries = []
if end_ts:
fil1 = start_ts <= data_ds['timestamp']
fil2 = data_ds['timestamp'] < end_ts
fil = fil1 & fil2
else:
fil = start_ts <= data_ds['timestamp']
index_shift = len(data_ds) - np.count_nonzero(fil)
new_data = data_ds[index_shift:]
new_name = "new_{}".format('_'.join(fp.filename.split('/')))
print("Processing index {}...".format(new_name))
shifted_idx = np.add(idx_ds['index'], -index_shift)
'''notice we are using idx_ds['index'] here, think why?'''
idx_kept = np.count_nonzero(np.where(shifted_idx >= 0, idx_ds['index'], 0))
idx_omit = len(idx_ds) - idx_kept
new_idx = idx_ds[idx_omit:]
new_idx['index'] = shifted_idx[idx_omit:]
with h5py.File(new_name, 'w') as f:
f.create_dataset(name=_type, data=new_data, compression=algo, compression_opts=level, maxshape=(None,))
f.create_dataset(name='index', data=new_idx, compression=algo, compression_opts=level, maxshape=(None,))
Here is the result within the source dataset of 100,000 ~ records: Before
0.7670650999999999
After
0.0236209
Take a look at this beautifully ~30 times improvement. In summary, this concept is very common in another big data manipulation framework Spark
where the RDD is somehow equivilant to dataset here. I was too stupid not to use this concept at the very beginnig which cause the program to dangle for a long time. Fundamentally, this topic leads to another fundamental technique that should be kept in mind when doing data processing: always try to stick to built-in numpy functions. Below are some common scenario:
- Want to keep distinct records only? Use
numpy.unique
. It is very tempting to do distinction checking using one for loop plus the dictionary or set. But numpy array comes along with enough extra data to tell the distinctions. - use timestamp manipulation over datetime manipulation. This is not that related to the disctussion of built-in numpy functions checking. But in my experience, time series data is pretty common to compare in the daily data processing. For the same size of the above dataset, below is a first impression of such technique could improve our code:
- before
- after
Different tools to profile your code
- cProfile As discussed above, the timestamp improvement is actually spotted by using the cProfile module. Some basic usage:
python -m cProfile -s tottime -o result.out <script.py>
-s
: sort by tottime.
-o
: save the result to the result.out file.
For ipython interactive mode:
In [1]: %prun myfunc()
- mprof usage: mprof run myscript.py mprof plot
concurrent is not always the silver bullet
Because most of the time, you need to not only consider how fast your program can run but also limit the memory that the whole program could use.
Reference
- https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.count_nonzero.html
- https://numpy.org/devdocs/reference/generated/numpy.where.html?highlight=where#numpy.where