A fire hydrant bursting on a cold and icey night, turning a stream of water into a street-side Iceberg

Using Amazon Data Firehose to populate Iceberg Tables

by Paul Symons

Contents

Summary

AWS have silently released a significant feature for Amazon Data Firehose, in public preview - the ability to write Firehose Delivery Streams direct to Iceberg Open Table Format (OTF) on S3.

Read the documentation and try it today, heeding warnings to avoid usage in production, until the feature escapes preview.

This feature is significant because it paves the way for efficient data lake storage (no more small files) of high frequency and intermittent data alike.

If you would be interested to try this feature with Hudi or Delta Lake Open Table Formats - please get in touch with your AWS reps and let them know.

Prior Art

AWS is late to the game on this feature. Databricks released a preview of Delta Live Tables way back in 2021, which recently went GA. As a capability, it far exceeds what this Firehose feature accomplishes - but is similar enough in functionality to be worth a mention.

Data Movement platform Fivetran also released support for Iceberg as a destination in April 2023; I can only imagine it has been a great success, given they have recently begun offering their own managed data lake service.

Amazon Data Firehose is already a great AWS Service - it has some very powerful features to help you write to data lakes:

However, the world has moved on since these features were released. Open Table Formats are beginning to take over as Data Platform vendors provide more support and interoperability, whilst simultaneously battling each other for LinkedIn bait supremacy, like a cringeworthy re-enactment of the Anchorman Fight Scene

Open Table Formats - Recap

Open Table Formats improve the way data is managed on blob stores like Amazon S3. Dremio’s table format comparison is a good guide on the key features and differences between popular formats, but in summary, most offer:

  • Addition of transaction and snapshot support
  • Partitioning configuration in metadata instead of storage hierarchy
  • Schema and Partition evolution

Not all formats support all features, and performance can vary significantly for different use cases.

A disconcerting but familiar scenario has eventuated where amongst 3 compelling table formats, a single one has achieved notoriety over others due to a diminishing feedback loop: customers tell vendors they are investigating the one table format experiencing the most broad technical support and marketing; further vendor support at the expense of other formats continues to amplify this effect. Clearly, this also diminishes pragmatic customer choice.

Data Firehose to Iceberg

Fundamentally, this feature works like any other Firehose Destination; you put records in your delivery stream, and shortly after, you have rows in your table!

There are however, a few Iceberg specific features that are very useful or worth calling out:

  1. Inserts, Updates and Deletes
  2. Update is like POST, not PUT
  3. Table Fan Out

Inserts, Updates and Deletes

It is somewhat novel that Data Firehose introduces update and delete operations, in addition to insert - as far as I know, there is no precedent for this. As a result, this changes the data payload that you specify in your put-record calls. Specifically, the record (your data) and metadata (destination database and table) are specified separately. You may ask why we have to specify the metadata - we’ll get to that shortly.

A payload for an S3 destination that previously looked like

{
    "Data": {
        "id": 1,
        "myfield": "Hello world"
    }
}

Changes to look like

{
    "ADF_Record": {
        "id": 1,
        "myfield": "Hello world"
    },
    "ADF_Metadata": {
        "OTF_Metadata": {
            "DestinationTableName": "my_iceberg_table",
            "DestinationDatabaseName": "my_database",
            "Operation": "UPDATE"
        }
    }
}

The destination supports three Operation values - INSERT, UPDATE, and DELETE. Notably, for any tables you want to be able to update or delete with, you must specify their unique keys as part of the delivery stream configuration, so that the relevant merge / delete options can identify the correct records.

Update is like POST, not PUT

One thing to know is that when you do an UPDATE operation, it is a complete replacement of the row in the iceberg table matched by unique keys. If you omit a key in the Record, that is set in the table, the column value in the table will be erased:

Table Before

id (unique key)item_descriptionprice
1Red Lorry42.51
2Yellow Lorry99.25

Firehose Payload

{
    "ADF_Record": {
        "id": 1,
        "price": 45.95
    },
    "ADF_Metadata": {
        "OTF_Metadata": {
            "DestinationDatabaseName": "glue_database",
            "DestinationTableName": "iceberg_table",
            "Operation": "UPDATE"
        }
    }
}

Table After

id (unique key)item_descriptionprice
145.95
2Yellow Lorry99.25

Always send the complete payload when running an UPDATE operation

Table Fan Out

I think this has to be the most surprising feature, and my favourite - with each record sent, you can choose which table it is sent to. The impact of this is significant.

Imagine you are receiving data via a web service from an application team operating a fleet of microservices; they are applying a software change across the fleet to alter the data model they send, but it cannot execute atomically.

BeforeAfter
{
    "version": 1,
    "first_name": "Eleftherios",
    "last_name": "Venizelos"
}
{
    "version": 2,
    "first_name": "Eleftherios",
    "last_name": "Venizelos",
    "middle_names": [ "Kyriakou" ]
}

Once you are aware of the new message structure, you create a new iceberg table with a column to accept middle_names. Now when you receive data payloads, you can use the same Firehose delivery stream, but target different tables:

database_name = "appdata"
table_name = "customers"
if data["version"] > 1:
    table_name =  table_name += data['version']

adf_metadata = {
     { "OTF_Metadata": { "DestinationTableName": table_name }}
}

This is one contrived example, but I expect many teams will find this flexibility very useful.

Caveats and Cautions

Cost

Remember, Amazon Data Firehose is rather expensive for high throughput workloads. You are paying for the significant convenience of not maintaining considerable high availability infrastructure and boilerplate code. Use it where it’s appropriate, but consider other options for high volume data ingress.

Use caution with Updates and Deletes

  1. There are many reasons why destination sends may fail. Consider carefully the impact on data integrity of mis-ordered operations resulting from failed sends
  2. Iceberg tables don’t have uniqueness constraints, so if you have duplicates:
    • A delete will remove all duplicates matching the unique key
    • An update will replace all duplicates with a single record based on the provided ADF_Record
  3. If you do a lot of deletes and updates it will start to decrease table performance, so …

Perform Table Maintenance!

Performing many small operations will create a lot of snapshot metadata and data files.

Make sure to set appropriate snapshot retention properties and periodically maintain your Iceberg tables.

Use Athena, Spark or pyiceberg to modify table schema

Using the Glue table schema editor seems not to work for iceberg tables, despite appearing to do so in the UI. Tread carefully and consider other options.

Conclusion

This is a really great feature from AWS that I am pleased to see. I look forward to seeing it develop further with additional Open Table Formats in the future!

References