Snowflake (dagster-snowflake)

This library provides an integration with the Snowflake data warehouse.

To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.

dagster_snowflake.snowflake_resource ResourceDefinition[source]

Config Schema:
account (dagster.StringSource, optional)

Your Snowflake account name. For more details, see https://bit.ly/2FBL320.

user (dagster.StringSource)

User login name.

password (dagster.StringSource)

User password.

database (dagster.StringSource, optional)

Name of the default database to use. After login, you can use USE DATABASE to change the database.

schema (dagster.StringSource, optional)

Name of the default schema to use. After login, you can use USE SCHEMA to change the schema.

role (dagster.StringSource, optional)

Name of the default role to use. After login, you can use USE ROLE to change the role.

warehouse (dagster.StringSource, optional)

Name of the default warehouse to use. After login, you can use USE WAREHOUSE to change the role.

autocommit (Bool, optional)

None by default, which honors the Snowflake parameter AUTOCOMMIT. Set to True or False to enable or disable autocommit mode in the session, respectively.

client_prefetch_threads (dagster.IntSource, optional)

Number of threads used to download the results sets (4 by default). Increasing the value improves fetch performance but requires more memory.

client_session_keep_alive (dagster.StringSource, optional)

False by default. Set this to True to keep the session active indefinitely, even if there is no activity from the user. Make certain to call the close method to terminate the thread properly or the process may hang.

login_timeout (dagster.IntSource, optional)

Timeout in seconds for login. By default, 60 seconds. The login request gives up after the timeout length if the HTTP response is “success”.

network_timeout (dagster.IntSource, optional)

Timeout in seconds for all other operations. By default, none/infinite. A general request gives up after the timeout length if the HTTP response is not ‘success’.

ocsp_response_cache_filename (dagster.StringSource, optional)

URI for the OCSP response cache file. By default, the OCSP response cache file is created in the cache directory.

validate_default_parameters (Bool, optional)

False by default. Raise an exception if either one of specified database, schema or warehouse doesn’t exists if True.

paramstyle (dagster.StringSource, optional)

pyformat by default for client side binding. Specify qmark or numeric to change bind variable formats for server side binding.

timezone (dagster.StringSource, optional)

None by default, which honors the Snowflake parameter TIMEZONE. Set to a valid time zone (e.g. America/Los_Angeles) to set the session time zone.

connector (dagster.StringSource, optional)

Indicate alternative database connection engine. Permissible option is ‘sqlalchemy’ otherwise defaults to use the Snowflake Connector for Python.

cache_column_metadata (dagster.StringSource, optional)

Optional parameter when connector is set to sqlalchemy. Snowflake SQLAlchemy takes a flag cache_column_metadata=True such that all of column metadata for all tables are “cached”

numpy (dagster.StringSource, optional)

Optional parameter when connector is set to sqlalchemy. To enable fetching NumPy data types, add numpy=True to the connection parameters.

authenticator (dagster.StringSource, optional)

Optional parameter to specify the authentication mechanism to use.

A resource for connecting to the Snowflake data warehouse.

A simple example of loading data into Snowflake and subsequently querying that data is shown below:

Examples:

from dagster import job, op
from dagster_snowflake import snowflake_resource

@op(required_resource_keys={'snowflake'})
def get_one(context):
    context.resources.snowflake.execute_query('SELECT 1')

@job(resource_defs={'snowflake': snowflake_resource})
def my_snowflake_job():
    get_one()

my_snowflake_job.execute_in_process(
    run_config={
        'resources': {
            'snowflake': {
                'config': {
                    'account': {'env': 'SNOWFLAKE_ACCOUNT'},
                    'user': {'env': 'SNOWFLAKE_USER'},
                    'password': {'env': 'SNOWFLAKE_PASSWORD'},
                    'database': {'env': 'SNOWFLAKE_DATABASE'},
                    'schema': {'env': 'SNOWFLAKE_SCHEMA'},
                    'warehouse': {'env': 'SNOWFLAKE_WAREHOUSE'},
                }
            }
        }
    }
)
dagster_snowflake.build_snowflake_io_manager(type_handlers)[source]

Builds an IO manager definition that reads inputs from and writes outputs to Snowflake.

Parameters

type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between slices of Snowflake tables and an in-memory type - e.g. a Pandas DataFrame.

Returns

IOManagerDefinition

Examples

from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler

snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])

@job(resource_defs={'io_manager': snowflake_io_manager})
def my_job():
    ...
dagster_snowflake.snowflake_op_for_query(sql, parameters=None)[source]

This function is an op factory that constructs an op to execute a snowflake query.

Note that you can only use snowflake_op_for_query if you know the query you’d like to execute at graph construction time. If you’d like to execute queries dynamically during job execution, you should manually execute those queries in your custom op using the snowflake resource.

Parameters
  • sql (str) – The sql query that will execute against the provided snowflake resource.

  • parameters (dict) – The parameters for the sql query.

Returns

Returns the constructed op definition.

Return type

OpDefinition

class dagster_snowflake.SnowflakeConnection(config, log)[source]
execute_queries(sql_queries, parameters=None, fetch_results=False)[source]
execute_query(sql, parameters=None, fetch_results=False)[source]
get_connection(raw_conn=True)[source]
load_table_from_local_parquet(src, table)[source]