Data Versioning for Customer Reports Using DataBricks and lakeFS

SimilarWeb offers an AI-based market intelligence platform that helps monitor web and mobile app traffic. The company gives a global multi-device market intelligence to understand, track, and grow digital market share. We analyze data from 3 Million applications and 80 Million websites to support thousands of clients ranging from small businesses to global enterprises to gain insight into any website’s statistics and strategy.

We’ve been using DataBricks since 2015. Initially, as a means to reduce DataOps efforts in enabling a spark environment for data scientists. It also supported the migration of code from research to production. Over the years we have shifted to running production workloads on DataBricks.

The data services group at SimilarWeb is responsible for producing data reports customized to the client’s requirements. These reports include competitive analysis of funnel conversion, competitive lost opportunities reports, etc.

To perform the analysis, we rely on data generated by other groups at SimilarWeb and data that we deduce from publicly available information. The data we use is constantly changing as websites constantly evolve, algorithms improve and our training sets continue to grow. Since a report is provided using the best data we have at the time it was produced, consecutive reports may use different data sets as input. This is expected, but also requires a good system to maintain the versions of data used to create a report, to ensure auditing, reproducibility, and troubleshooting.

SimilarWeb’s internal reporting application is named 360. It has a front-end interface to configure the report’s parameters and their scheduling. 360 manages a queue of jobs on Redis that are executed by workers who initiate a DataBricks cluster for the analysis. The DataBricks jobs run over an S3 bucket. The input is read from S3 and the output is written to S3 and can be consumed through the 360 application interface. We also maintain and run a daily airflow DAG that does all our pre-processing.  The DAG tasks are also executed on a DataBricks cluster that is spun up for that purpose. Input data is read from S3 buckets owned by other groups, and the output is written to our data bucket to be consumed by the 360 application reports.

For data versioning and manageability we decided to test lakeFS, an open-source tool that provides git-operations over S3.  It’s compatibility and seamless integration with DataBricks was an important consideration. 

 

Why Data Versioning with lakeFS 

When we run a report for a customer, it is usually a recurring report produced once a day, a week or a month. The report depends on input data that can be updated by backfills or replaced due to accuracy improvements. For example,  the analysis code can change to improve accuracy or fix bugs, and the complementary data may change due to the evolution of digital assets analyzed. . Consequently, when we run a report we want the ability to follow versions of code, input data, and output results. 

Introducing data versioning capabilities provides us with the following:

Reports Versioning

For every report issued a lakeFS commit ID is added. This ID marks the report version. We can access any version of the report our retention policy allows us, and analyze it if customers require clarification or have follow up questions.

Reproducibility

We can time travel between different versions of the report and for each version we have not only the report itself, but also a snapshot of our data repository at the time the report was calculated. The commit metadata includes the code version we used. We can now run the report again and receive the same result. We can open a lakeFS branch from that commit and test a new code version on the same input data, etc’

Auditing

For some of the industries we serve, being able to present the data used for the analysis is required as part of the business audit. With lakeFS we can easily retain this information and share it upon request.

Integrating lakeFS with DataBricks

As a first phase we decided to test a single Spark job, running on DataBricks 7.6 and using lakeFS. It was as easy as advertised.

To interact with lakeFS, we needed to configure DataBricks to use lakeFS as its S3 endpoint. The lakeFS documentation explains how to do it in Spark code, but we did it through the cluster configuration:

Afterwards, we could use our existing code to read data from lakeFS. The only necessary modification was to include the lakeFS branch name in the paths. Here is how we created a Spark DataFrame from our lakeFS data:

df = spark.read.parquet(“s3a://example-repo/example-branch/example.parquet”)

We were now able to operate on this DataFrame:

results = df.groupBy(“example-column”).count()

Then we wrote the results back to lakeFS:

results.write.parquet(“s3a://example-repo/example-branch/output/”)

The results now appeared as uncommitted changes on our lakeFS repository:

We could then commit these changes. After verifying the results, we merged our branch into our main branch, making them visible to everyone.

When we had to stay with DataBricks 5.5

In some of our jobs, we were still using DataBricks 5.5. It’s us, not them. We were holding on to Python 2.0 due to legacy code depending on libraries not supported by Python 3.0 till recently. The rest of the world, and DataBricks with it, had stopped supporting Python 2.0. DataBricks did it in version 6.0.

Since we had not yet refactored the Python code to version 3.0, we couldn’t upgrade DataBricks for that pipeline, but we still wanted lakeFS to work.

To read data from lakeFS, the S3 endpoint needed to be set to the lakeFS S3-compatible API. Since our jobs also required reading data not yet managed in lakeFS, we needed to use Hadoop’s per-bucket configuration: some paths needed to be read from lakeFS, others directly from S3. Sadly, DataBricks started supporting per-bucket configuration only in version 7.3, allowing us to use it only with a single S3 endpoint.

To solve this problem, the team at lakeFS implemented a fallback mechanism. For a given request, this feature checks whether the request refers to an existing repository in lakeFS, forwarding it to S3 if it doesn’t. This way we could simply set the S3 endpoint to the lakeFS API, which would handle only relevant requests, forwarding the rest to S3.