Asset versioning and caching#

This guide demonstrates how to build memoizable graphs of assets. Memoizable assets help avoid unnecessary recomputation, speed up the developer workflow, and save computational resources.


Context#

There's no reason to spend time materializing an asset if the result is going to be the same as the result of its last materialization.

Dagster's versioning system helps you determine ahead of time whether materializing an asset will produce a different result. It's based on the idea that the result of an asset materialization shouldn't change as long as:

  • The code used is the same code as the last time the asset was materialized.
  • The input data is the same input data as the last time the asset was materialized.

Dagster has two versioning concepts to represent the the code and input data used for each materialization:

  1. Code version. A string that represents the version of the code that computes an asset. This is the code_version argument of @asset.
  2. Logical version. A string that represents the version of the value of the asset. This is represented as a LogicalVersion object.

By keeping track of code and logical versions, Dagster can predict whether a materialization will actually change the underlying value. This allows it to skip redundant materializations and instead return the previously-computed value. In more technical terms, Dagster offers a limited form of memoization for assets-- the last-computed asset value is always cached.

In computationally expensive data pipelining, this approach can yield tremendous benefits.


Step One: Understanding Staleness#

Dagster automatically computes the logical version of software-defined assets. It does this by hashing a code version together with the logical versions of any input assets.

Let's start with a trivial asset that returns a hardcoded number:

from dagster import asset


@asset
def a_number():
    return 1

We materialize it and look at the entry in the Asset Catalog:

Simple asset logical version

You'll notice a logical version in the "System tags" section in the materialization details. This is the hash dagster has automatically computed. In this case, since there are no inputs, the logical version is a hash of the code version only, which is also visible in "System tags".

If we materialize the asset again, you'll notice that both the code version and logical version change. This is by design. Dagster cannot tell on its own if the code has changed, so it must assume that it has changed on every materialization.

We can improve this situation by introducing explicit code versions. Let's add a code_version on our asset:

from dagster import asset


@asset(code_version="v1")
def versioned_number():
    return 1

Let's materialize this asset. Now the the user-defined code version, v1, is in the latest materialization, instead of an automatically generated hash:

Simple asset logical version with code version

Now let's update the code and inform dagster that the code has changed. Do this by changing the code version argument:

from dagster import asset


@asset(code_version="v2")
def versioned_number():
    return 11

Now we reload our definitions:

Simple asset logical version with code version (v2)

Note that the asset is now marked as stale. That is because it has not been materialized since the code was changed. Because of the explicit code_version argument, Dagster knows this. It must be materialized again to become up-to-date.

If we click on the disclosure triangle on the right side of the "Materialize" button, a "Materialize stale and missing" button appears. Clicking this button will materialize all stale and missing assets in the asset graph. We can use it now to rematerialize versioned_number. If we view the latest asset materialization in the asset catalog, the code_version is now "v2" and the asset is marked as up-to-date.


Step Two: Staleness with Dependencies#

Staleness computation becomes more powerful and useful when there are dependencies in play. Let's add an asset downstream of our first asset.

from dagster import asset


@asset(code_version="v2")
def versioned_number():
    return 11


@asset(code_version="v1")
def multipled_number(versioned_number):
    return versioned_number * 2

Reload the location in dagit and you'll notice that multipled_number is marked as "Never Materalized". Now click on the "Materialize" disclosure triangle and then "Materialize stale and missing" (the plain "Materialize" button ignores versioning) to materialize.

Note that in the created run, only the step associated with multipled_number is run. The system knows that versioned_number is up to date and therefore can safely skip that computation. You can see this on the details page for the run:

Materialize stale event log

Now let's update the versioned_number asset-- we'll change its return value and code version:

from dagster import asset


@asset(code_version="v3")
def versioned_number():
    return 15


@asset(code_version="v1")
def multipled_number(versioned_number):
    return versioned_number * 2

As we saw previously, this will cause versioned_number to become stale. But since multipled_number depends on versioned_number, it must be recomputed as well, and so is also marked stale:

Dependencies code version only

Click "Materialize stale and missing" to get both assets up-to-date again.


Step Three: Staleness with Source Assets#

In the real world, data pipelines depend on external upstream data. So far in this tutorial, we haven't used any external data-- we've been substituting hardcoded data in the asset at the root of our graph, and using a code version as a stand-in for the version of that data. We can do better than this.

External data sources in Dagster are modeled by SourceAssets. We can add versioning to a SourceAsset by making it observable. An observable source asset has a user-defined function that computes a logical version.

Let's add an @observable_source_asset called input_number. This will represent a file written by an external process, upstream of our pipeline:

29034

The body of the input_number function computes a hash of the file contents and returns it as a LogicalVersion. We'll set input_number as an upstream dependency of versioned_number.

from hashlib import sha256

from dagster import LogicalVersion, asset, file_relative_path, observable_source_asset


def sha256_digest_from_str(string: str) -> str:
    hash_sig = sha256()
    hash_sig.update(bytearray(string, "utf8"))
    return hash_sig.hexdigest()


FILE_PATH = file_relative_path(__file__, "input_number.txt")


@observable_source_asset
def input_number():
    with open(FILE_PATH) as ff:
        return LogicalVersion(sha256_digest_from_str(ff.read()))


@asset(code_version="v3", non_argument_deps={"input_number"})
def versioned_number():
    with open(FILE_PATH) as ff:
        return int(ff.read())


@asset(code_version="v1")
def multipled_number(versioned_number):
    return versioned_number * 2

Adding an observable source asset to our graph has revealed a new button labeled "Observe Sources":

Source asset in graph

We click this button to kick off a run that executes the observation function of input_number. Let's look at the entry in the asset catalog for input_number:

Source asset in catalog

Note the "logical version" listed here that you computed.

If we return to the asset graph, we see that versioned_number and multiplied_number are stale. "Materialize all" to bring them up to date.

Finally, let's manually alter the file to simulate the activity of an external process. If we click the "Observe Sources" button again, the downstream assets are again marked stale-- the observation run generated a new logical version for input_number, because its content changed.

Asset memoization is under very active development. This guide will be updated as we roll out new features in the coming weeks.