During my day job, I help a number of different organisations to build data warehouses on Google Cloud Platform, and one of the key products that make this possible at scale is Dataflow. Dataflow is a serverless product that allows for massive scaling for trivially parellisable problems, such as those typically found in ETL (Extract Transform and Load) applications. This allows the developer to focus on solving the business problem and delivering value, without focusing on scaling and performance. For 90% of cases this is true… let’s talk about that 10%.

The problem

I got an email one morning about a dataflow pipeline that had been running for more than 12hrs on a single node. Which given it was meant to be run daily, and all of the other pipelines had run in minutes this was concerning. There was nothing odd about the pipelines at first glance, it simply read a 100MB compressed file and wrote it out to BigQuery. So why did it take so long?

Digging deeper

We were making use of the standard Apache Beam transforms for both reading the data from a text file and for inserting it into BigQuery. So we figured that these were not the issue, given that people far smarter than I would have come across this issue and raised a bug report if the performance had been this bad. Pointing at something to do with our application.

The TextIO plugin natively supports decompressing files, but this pipeline seemed to be stuck using a single node, why was this? Looking at the dataflow graph I noticed that all of the steps were lit up green, meaning that they had been fused together dataflow, so each bundle was being shuffled through from start to finish. I’d seen this take a while before when using an external data source as a step. So I added a quick PTransform to key, group, and ungroup the records between reading the file and sending it to BQ. This would effectively decouple reading the file from writing it into BigQuery, as Beam can’t fuse the steps either side of the group operation.

This sped up the reading of the file, and it now took less than 3 minutes to read the file. But the whole pipeline was stuck on a single node, and only processing 90 elements (rows) per second. Giving an approximate run time of ~ 4hrs. However, if not constrained dataflow will aim for 1000s of elements/s, So whilst this was better something still wasn’t quite right.

Down the Rabbit Hole

In order to understand why the performance was poor, I unzipped the file and found it expanded to nearly 20GB from it’s 100MB. So it contains mostly empty fields and rows, but still that level of expansion is unexpected, but gave a clue as to the poor performance.

When processing data from sources, Beam uses a range tracker to work out how to break data up into bundles, which can be shuffled between worker nodes where possible to allow it to scale to massive numbers of nodes. However this can only be determined by from the input, which in our case is compressed. This leads to beam incorrectly trying to keep everything in too few bundles for the size of the resulting file. Leading it to incorrectly assume that a single node is required as there is no other way to break the data up.

Thankfully, we know better, and we can tell Beam this. There’s a utility transform, called Reshuffle, buried in the beam manual that groups all of the elements by a random key into a single window and reshuffles the data into new bundles. Effectively telling Beam, “have another go at breaking this one up”.

Did it make a difference? Did it ever! Adding this additional step took the processing time for this pipeline from more than 12 hrs to less than 30 minutes! I removed my hastily thrown together PTransform, put the other niceties back in and the customer was happy.

Lessons Learned

From this experience, which is the second time I’ve encountered Beam not quite getting it’s scaling right, I wouldn’t hesitate to add the reshuffle PTransform after any step which greatly increases the size of an element, or the number of elements. Whilst it does add a little to the run time, it’s relatively memory efficient and allows Beam to better optimise the rest of the pipeline.