Weights & Biases (dagster-wandb)

This library provides a Dagster integration with Weights & Biases.

Use Dagster and Weights & Biases (W&B) to orchestrate your MLOps pipelines and maintain ML assets.


The integration with W&B makes it easy within Dagster to:

Resource

dagster_wandb.wandb_resource ResourceDefinition[source]

Config Schema:
api_key (dagster.StringSource):

W&B API key necessary to communicate with the W&B API.

host (String, optional):

API host server you wish to use. Only required if you are using W&B Server.

Default Value:https://api.wandb.ai

Dagster resource used to communicate with the W&B API. It’s useful when you want to use the wandb client within your ops and assets. It’s a required resources if you are using the W&B IO Manager.

It automatically authenticates using the provided API key.

For a complete set of documentation, see Dagster integration.

To configure this resource, we recommend using the configured method.

Example:

from dagster import job
from dagster_wandb import wandb_resource

my_wandb_resource = wandb_resource.configured({"api_key": {"env": "WANDB_API_KEY"}})

@job(resource_defs={"wandb_resource": my_wandb_resource})
def my_wandb_job():
    ...

I/O Manager

dagster_wandb.wandb_artifacts_io_manager IOManager[source]

Config Schema:
run_name (String, optional):

Short display name for this run, which is how you’ll identify this run in the UI. By default, it`s set to a string with the following format dagster-run-[8 first characters of the Dagster Run ID] e.g. dagster-run-7e4df022.

run_id (String, optional):

Unique ID for this run, used for resuming. It must be unique in the project, and if you delete a run you can’t reuse the ID. Use the name field for a short descriptive name, or config for saving hyperparameters to compare across runs. The ID cannot contain the following special characters: /#?%:.. You need to set the Run ID when you are doing experiment tracking inside Dagster to allow the IO Manager to resume the run. By default it`s set to the Dagster Run ID e.g 7e4df022-1bf2-44b5-a383-bb852df4077e.

run_tags (List[String], optional):

A list of strings, which will populate the list of tags on this run in the UI. Tags are useful for organizing runs together, or applying temporary labels like ‘baseline’ or ‘production’. It’s easy to add and remove tags in the UI, or filter down to just runs with a specific tag. Any W&B Run used by the integration will have the dagster_wandb tag.

base_dir (String, optional):

Base directory used for local storage and caching. W&B Artifacts and W&B Run logs will be written and read from that directory. By default, it`s using the DAGSTER_HOME directory.

cache_duration_in_minutes (Int, optional):

Defines the amount of time W&B Artifacts and W&B Run logs should be kept in the local storage. Only files and directories that were not opened for that amount of time are removed from the cache. Cache purging happens at the end of an IO Manager execution. You can set it to 0, if you want to disable caching completely. Caching improves speed when an Artifact is reused between jobs running on the same machine. It defaults to 30 days.

Dagster IO Manager to create and consume W&B Artifacts.

It allows any Dagster @op or @asset to create and consume W&B Artifacts natively.

For a complete set of documentation, see Dagster integration.

Example:

@repository
def my_repository():
    return [
        *with_resources(
            load_assets_from_current_module(),
            resource_defs={
                "wandb_config": make_values_resource(
                    entity=str,
                    project=str,
                ),
                "wandb_resource": wandb_resource.configured(
                    {"api_key": {"env": "WANDB_API_KEY"}}
                ),
                "wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
                    {"cache_duration_in_minutes": 60} # only cache files for one hour
                ),
            },
            resource_config_by_key={
                "wandb_config": {
                    "config": {
                        "entity": "my_entity",
                        "project": "my_project"
                    }
                }
            },
        ),
    ]


@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]

Config

class dagster_wandb.WandbArtifactConfiguration(*args, **kwargs)[source]

W&B Artifacts IO Manager configuration. Useful for type checking.

class dagster_wandb.SerializationModule(*args, **kwargs)[source]

W&B Artifacts IO Manager configuration of the serialization module. Useful for type checking.

Errors

exception dagster_wandb.WandbArtifactsIOManagerError(message='A W&B Artifacts IO Manager error occurred.')[source]

Represents an execution error of the W&B Artifacts IO Manager

Ops

dagster_wandb.run_launch_agent(context)[source]

It starts a Launch Agent and runs it as a long running process until stopped manually.

Agents are processes that poll launch queues and execute the jobs (or dispatch them to external services to be executed) in order.

Example:

# config.yaml

resources:
  wandb_config:
    config:
      entity: my_entity
      project: my_project
ops:
  run_launch_agent:
    config:
      max_jobs: -1
      queues:
        - my_dagster_queue
from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(
    resource_defs={
        "wandb_config": make_values_resource(
            entity=str,
            project=str,
        ),
        "wandb_resource": wandb_resource.configured(
            {"api_key": {"env": "WANDB_API_KEY"}}
        ),
    },
)
def run_launch_agent_example():
    run_launch_agent()
dagster_wandb.run_launch_job(context)[source]

Executes a Launch job.

A Launch job is assigned to a queue in order to be executed. You can create a queue or use the default one. Make sure you have an active agent listening to that queue. You can run an agent inside your Dagster instance but can also consider using a deployable agent in Kubernetes.

Example:

# config.yaml

resources:
  wandb_config:
    config:
      entity: my_entity
      project: my_project
ops:
  my_launched_job:
    config:
      entry_point:
        - python
        - train.py
      queue: my_dagster_queue
      uri: https://github.com/wandb/example-dagster-integration-with-launch
from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(
    resource_defs={
        "wandb_config": make_values_resource(
            entity=str,
            project=str,
        ),
        "wandb_resource": wandb_resource.configured(
            {"api_key": {"env": "WANDB_API_KEY"}}
        ),
    },
)
def run_launch_job_example():
    run_launch_job.alias("my_launched_job")() # we rename the job with an alias