Build a Lakehouse with Databricks, DLT and dbt
by Paul Symons ©Contents
- Summary
- Context
- Preparing for the star schema
- Building the Landing layer
- Configuring the bronze layer
- Dimensions and Facts
- Conclusion
Summary
In this two part series we’ll delve into a recent data migration from SQL Server to Databricks that I led, where we had the opportunity to
- build a greenfield Lakehouse project
- create a simple workflow to reconstruct Type 2 Slowly Changing Dimensions
These posts explore why and how we did it, and how building with Databricks and data build tool (dbt) made it much simpler.
This is Part One.
Context
Our task was to design and build a solution to migrate the engineering process and existing data for a Finance-related star schema from SQL Server to Databricks, in collaboration with the client team.
In discovery sessions with the team, we learned that they were already using Azure Data Factory to copy data from source systems to blob storage. Crucially, they were creating (and retaining) full table copies on a daily schedule, as opposed to incremental table loads or change data capture.
While there are drawbacks to this style of integration, it has a few upsides as well
- simple to understand and automate
- no table specific customisations required (e.g. primary keys or update timestamp columns)
- table dumps in a single parquet file preserve original source schema (physical model)
This notable starting point shaped the solution that we designed.
Preparing for the star schema
Before we could rebuild Type 2 SCD history, we had to build a working data pipeline and transformation for the latest data.
Initially, we had planned to have a simple bronze → silver → gold medallion architecture, where the bronze layer would be the latest dumped snapshot read from the Storage Container, but a suggestion from a Databricks Solution Architect made us look at things a little differently.
Essentially, the recommendation was, why not load all the snapshots into Unity Catalog? Whilst initially skeptical, the notable upsides were convincing:
- Excess storage over time could be managed by periodic culls of older snapshot data
- All snapshot data is accessible through a consistent SQL interface
- Autoloader manages destination table schemas
- The snapshot represented by the bronze layer could be easily modified
We named this concept the raw landing layer, and added it to the beginning of our medallion architecture. This
addition laid the groundwork for the history regeneration described in Part 2.
Building the Landing layer
If you’ve never used Delta Live Tables (soon to be Lakeflow Declarative Pipelines), but are familiar with Snowflake, it’s similar to what can be achieved with Snowpipes and Dynamic Tables, but a bit more flexible and feature packed.
It is a concise mechanism to ingest data from various sources, and build sequences of linked transformations - and you can choose to do this with either Python or SQL. In the end, we elected to defer most of our transformations to dbt.
A standout feature of DLT is that it will not only create the destination table schema for you, but will evolve the schema to match the source over time. This is a really important feature, and works particularly well with structured input formats like parquet. In addition, using the Autoloader feature makes the pipeline idempotent: successive runs will only process new files.
My personal experience working with DLT had me prefer the SQL programming model, as it feels very intuitive. However, the flexibility of the Python interface won out when we had to scale the configuration; for one of our sources, we had around 50 tables to ingest. Using a yaml based configuration approach combined with programmatic python, allowed us to have a single code asset that would create DLT table for each of the 50 data source configurations.
A python example might typically look like this:
import dlt
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
@dlt.table(name="landing.sourceA.tableA", comment="Table A in Unity Catalog")
def raw_table():
df = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("s3a://bucketname/path/to/files/**/*.parquet")
return df The same SQL example might look like this:
CREATE OR REFRESH STREAMING TABLE landing.sourceA.tableA
AS SELECT * FROM STREAM read_files(
"s3a://bucketname/path/to/files/**/*.parquet",
format => "parquet"
);Data Skipping
Data skipping is an important concept in open table formats, that allows query planning to skip scanning entire data files. It can be achieved by partitioning, but also other techniques such as by gathering column statistics on writes (i.e. max and min values, null counts) .
However, typically this is only done for the first 32 columns; as some of our tables would have in excess of 200 columns, it was critical to ensure metadata for snapshot date values was retained by placing them first.
Therefore, an important part of the pipeline for us, was building it to extract the snapshot date from the object key name being read, and write both as the first two columns of each table, as shown in the image above.
DLT Highlights
Some properties we appreciated of these pipelines were:
- Isolated developer deployments and testing was very accessible using the Databricks Asset Bundles, the databricks CLI and personal workspaces
- A single code asset was able to read all of the 50+ structured parquet sources, from 3 separate systems, without reference to physical models (e.g. column names and data types)
- When new patterns emerged (e.g. reading from different file types or additional intermediate processing), a new pattern could be created and used instead
- As ingestions were always INSERT operations as opposed to MERGE, performance was predictable and scalable
Configuring the bronze layer
Once DLT pipelines were landing our snapshot data in Unity Catalog, it was time to start our dbt project, and get to work on our bronze layer.
We configured our landing tables as dbt sources, and then continued to configure our models.
For the most part, bronze models were always a pass through of the landing table, with a
filter applied - to choose the snapshot LOAD_DATE. We wanted the default filter to be a qualifer
that selected the latest snapshot available, but with the ability to override this behaviour using
a dbt variable.
This is a great use case for dbt macros - they help you to consolidate code that is often re-used, and parameterise code for dynamic behaviour.
We created a macro called load_date_filter.sql like this:
{% macro load_date_filter(load_date_var, source) %}
{% if var(load_date_var,'') != '' -%}
WHERE _LOAD_DATE = (
SELECT
DISTINCT _LOAD_DATE
FROM {{ source }}
WHERE _LOAD_DATE <= {{ "'" ~ var(load_date_var) ~ "'"}}
ORDER BY _LOAD_DATE DESC
LIMIT 1
)
{%- else -%}
WHERE _LOAD_DATE = ( SELECT MAX(_LOAD_DATE) FROM {{ source }} )
{% endif %}
{%- endmacro %}This was easy to call and use in our bronze models, where the filter is applied at the end of the query:
{% set model_source = source('dataSourceA', 'tableB') -%}
SELECT * FROM {{ model_source }}
{{- load_date_filter('OVERRIDE_LOAD_DATE', model_source ) }}By doing this, our dbt project will reference the latest snapshot, unless we override it by
passing a dbt variable called OVERRIDE_LOAD_DATE with a valid snapshot date, as shown in the diagram below.
Dimensions and Facts
After building out bronze and silver layers - which were mostly aligned to source systems - work on the gold layer began.
A common way to build Type 2 Slowly Changing Dimensions is to use the dbt snapshot feature. It is a simple way to track changes in another model, adding a new row to the snapshot table when it notices a change to any given row, based on a primary key. Let’s take a look.
Dimensions
We started by defining dimension staging models in the gold layer that represented the latest projection of data for that dimension.
These were typically materialized as views (when debugging), or ephemeral models.
Snapshots can be run alone with the dbt snapshot command, but using the dbt build will also run them whilst respecting the order of upstream and downstream models yet to be built.
If you haven’t used snapshots before, they are essentially a way to track changes between a historical table (the “snapshot” table) and an upstream source table. The historical table will have the schema of the upstream source table, with some extra metadata columns typical in a Type 2 SCD, such as:
-
valid_from- the date the record became current -
valid_to- the date the record stopped being current (or “closed”) -
is_deleted- can be set true when creating a new record to indicate the record was deleted
The code to maintain this snapshot can be as simple as follows:
snapshots:
- name: wiggles_history
relation: ref('wiggles')
config:
unique_key: id
strategy: timestamp
updated_at: updated_at
hard_deletes: new_recordIf you are a long time user of dbt snapshots, it’s worth noting there are some useful opt-in advancements since version 1.9 to how snapshots work, that may make your life easier:
- for simple snapshots, you can use YAML only configuration
- hard delete support by closing current records (invalidate), or adding new rows (new_record)
- change the name of dbt metadata fields like dbt_valid_from, etc.
- change the value of the valid until date
- e.g. dbt_valid_to_current: 9999-99-99 if you prefer not to use NULL
Facts
Once your dimensions have been successfully built, you can build your fact tables as regular dbt models.
Building fact tables typically involves joining the transactional subject table to relevant dimensions you have just updated using dbt snapshots.
You might elect to rebuild fact tables using table materialization for simplicity, or with an incremental
materialization to preserve original dimension key references, if effective date ranges are accurate.
Conclusion
Building a greenfield Lakehouse project — to migrate an existing star schema model from another system — was a very rewarding experience.
Using Delta Live Tables allowed us to defer codification of physical data models as late as possible. This allowed the project to remain flexible and reduce cognitive burden.
Gravitating business rules and knowledge towards data build tool, helped to concentrate understanding of data models and transformation. The ability to capture and publish table and column metadata, as well as constraints such as primary and foreign keys, was very helpful (especially when furnishing Power BI Semantic Models).
dbt snapshots simplified the process of maintaining dimensions, significantly decreasing code surface area.
Lakeflow Jobs allowed for powerful integration and orchestration, providing a natural way to meld DLT and dbt tasks.
My personal takeaway, is the reminder that the properties exuded by a tool are key to its success and adoption, over and above its popularity or novelty.
We’ll continue this theme in Part 2, where we describe how dimensions were rebuilt with Lakeflow Jobs in a very simple way.