Microsoft Teams (dagster-msteams)

dagster_msteams.msteams_resource ResourceDefinition[source]

Config Schema:
hook_url (dagster.StringSource)

To send messages to MS Teams channel, an incoming webhook has to be created. The incoming webhook url must be given as a part of the resource config to the msteams_resource in dagster.

http_proxy (dagster.StringSource, optional)

https_proxy (dagster.StringSource, optional)

timeout (Float, optional)

Default Value: 60

Verify (Bool, optional)

This resource is for connecting to Microsoft Teams.

The resource object is a dagster_msteams.TeamsClient.

By configuring this resource, you can post messages to MS Teams from any Dagster solid:

Examples:

import os

from dagster import ModeDefinition, execute_pipeline, pipeline, solid
from dagster_msteams import Card, msteams_resource


@solid(required_resource_keys={"msteams"})
def teams_solid(context):
    card = Card()
    card.add_attachment(text_message="Hello There !!")
    context.resources.msteams.post_message(payload=card.payload)


@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"msteams": msteams_resource})],
)
def teams_pipeline():
    teams_solid()


execute_pipeline(
    teams_pipeline,
    {"resources": {"msteams": {"config": {"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}},
)
dagster_msteams.teams_on_failure HookDefinition[source]

Create a hook on step failure events that will message the given MS Teams webhook URL.

Parameters
  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.

Examples

@teams_on_failure(dagit_base_url="http://localhost:3000")
@pipeline(...)
def my_pipeline():
    pass
def my_message_fn(context: HookContext) -> str:
    return "Solid {solid_name} failed!".format(
        solid_name=context.solid
    )

@solid
def a_solid(context):
    pass

@pipeline(...)
def my_pipeline():
    a_solid.with_hooks(hook_defs={teams_on_failure("#foo", my_message_fn)})
dagster_msteams.teams_on_success HookDefinition[source]

Create a hook on step success events that will message the given MS Teams webhook URL.

Parameters
  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.

Examples

@teams_on_success(dagit_base_url="http://localhost:3000")
@pipeline(...)
def my_pipeline():
    pass
def my_message_fn(context: HookContext) -> str:
    return "Solid {solid_name} failed!".format(
        solid_name=context.solid
    )

@solid
def a_solid(context):
    pass

@pipeline(...)
def my_pipeline():
    a_solid.with_hooks(hook_defs={teams_on_success("#foo", my_message_fn)})
dagster_msteams.make_teams_on_pipeline_failure_sensor(hook_url, message_fn=<function _default_failure_message>, http_proxy=None, https_proxy=None, timeout=60, verify=None, name=None, dagit_base_url=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]

Create a sensor on pipeline failures that will message the given MS Teams webhook URL.

Parameters
  • hook_url (str) – MS Teams incoming webhook URL.

  • message_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which takes in the PipelineFailureSensorContext and outputs the message you want to send. Defaults to a text message that contains error message, pipeline name, and run ID.

  • http_proxy – (Optional[str]): Proxy for requests using http protocol.

  • https_proxy – (Optional[str]): Proxy for requests using https protocol.

  • timeout – (Optional[float]): Connection timeout in seconds. Defaults to 60.

  • verify – (Optional[bool]): Whether to verify the servers TLS certificate.

  • name – (Optional[str]): The name of the sensor. Defaults to “teams_on_pipeline_failure”.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.

  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

Examples

teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)

@repository
def my_repo():
    return [my_pipeline + teams_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
    return "Pipeline {pipeline_name} failed! Error: {error}".format(
        pipeline_name=context.pipeline_run.pipeline_name,
        error=context.failure_event.message,
    )

teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
    message_fn=my_message_fn,
    dagit_base_url="http://localhost:3000",
)