/
Snowflake collector

Snowflake collector

Overview

Snowflake’s Data Cloud is powered by an advanced data platform provided as Software-as-a-Service (SaaS). Snowflake enables data storage, processing and analytic solutions that are faster, easier to use, and far more flexible than traditional offerings.

Configuration requirements

To run this collector, there are some configurations detailed below that you need to consider.

Configuration

Details

Configuration

Details

Username and password

You will need to have a username and password to set up this collector.

Account identifier

You will need to have the account identifier you can find in your Snowflake’s instance.

Warehouse

The selected warehouse to compute and generate data.

More information

Refer to the Vendor setup section to know more about these configurations.

Devo collector features

Feature

Details

Feature

Details

Allow parallel downloading (multipod)

not allowed

Running environments

  • collector server

  • on-premise

Populated Devo events

table

Flattening preprocessing

no

Data sources

Data source

Description

API endpoint

Collector service name

Minimum role needed

Devo table

Available from release

Data source

Description

API endpoint

Collector service name

Minimum role needed

Devo table

Available from release

Access History

This topic provides concepts on the user access history in Snowflake.

SELECT * FROM access_history

access_history

GOVERNANCEVIEWER

db.snowflake.history.access

v1.0.0

Login History

This topic is used to query login attempts by Snowflake users within the last 365 days (1 year).

SELECT * FROM login_history

login_history

SECURITYVIEWER

db.snowflake.history.login

v1.0.0

Sessions

This topic provides information on the session, including information on the authentication method to Snowflake and the Snowflake login event.

SELECT * FROM sessions

sessions

SECURITYVIEWER

db.snowflake.history.session

v1.0.0

Custom SQL

This topic provides the chance to perform a custom SQL query

{custom_query}

custom_service

Any role that can access the schema

https://docs.snowflake.com/en/sql-reference/snowflake-db-roles

my.app.{custom_level_1}.{custom_level_2}

v1.0.0

For more information on how the events are parsed, visit our page.

Vendor setup

There are some requirements to set up the collector. You need to get a username, password, and account identifier from Snowflake.

  1. User your username and password that you used when creating a Snowflake account.

  2. Get your account identifier in your instance. For example https://<account_identifier>.snowflakecomputing.com

Connection

Using the vendor doc here

Minimum configuration required for basic pulling

Although this collector supports advanced configuration, the fields required to retrieve data with basic configuration are defined below.

This minimum configuration refers exclusively to those specific parameters of this integration. There are more required parameters related to the generic behavior of the collector. Check setting sections for details.

Setting

Details

Setting

Details

username

The username to authenticate to the service.

password

The password to authenticate to the service.

account_identifier

Account identifier is needed for connection to snowflake using the connector package available for python. Account identifier can be found in your instance URL. For example, https://<account_identifier>.snowflakecomputing.com/api. 

If you are not able to find the account identifier, then click on the bottom left account icon, and copy the URL as shown below.

warehouse

The selected warehouse to compute and generate data.

See the Accepted authentication methods section to verify what settings are required based on the desired authentication method.

Accepted authentication methods

Authentication method

Username

Password

Account identifier

Warehouse

Authentication method

Username

Password

Account identifier

Warehouse

Username/Password (including Account identifier and warehouse)

REQUIRED

REQUIRED

REQUIRED

REQUIRED

Run the collector

Once the data source is configured, you can send us the required information if you want us to host and manage the collector for you (Cloud collector), or deploy and host the collector in your own machine using a Docker image (On-premise collector).

Collector services detail

This section is intended to explain how to proceed with specific actions for services.

Access history

Access History in Snowflake refers to when the user query reads column data and when the SQL statement performs a data write operation, such as Insert, Update, and Delete, from the source data object to the target data object. The user access history can be found by querying the Account Usage Access_history view.

Each row in the Access_history view contains a single record per SQL statement. The record describes the columns the query accessed directly and indirectly (for example, the underlying tables that the data for the query comes from). These records facilitate regulatory compliance auditing and provide insights on popular and frequently accessed tables and columns since there is a direct link between the user (for example, query operator), the query, the table or view, the column, and the data.

Devo categorization and destination

All events of this service are ingested into the table db.snowflake.history.access

Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.

This service has the following components:

Component

Description

Component

Description

Setup

The setup module is in charge of authenticating the service and managing the token expiration when needed.

Puller

The setup module is in charge of pulling the data in a organized way and delivering the events via SDK.

Setup output

A successful run has the following output messages for the setup module:

INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> Starting the execution of setup() INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> Establishing Snowflake connection using provided username/password/account_identifier. INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> Snowflake Connector for Python Version: 2.9.0, Python Version: 3.9.12, Platform: Linux-5.14.0-1055-oem-x86_64-with-glibc2.31 INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity. INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> Setting use_openssl_only mode to False INFO InputProcess::SnowflakeDataPullerSetup(example_collector,snowflake#abc123,access_history#predefined,all) -> Setup for module <SnowflakeDataPuller> has been successfully executed

Puller output

A successful initial run has the following output messages for the puller module:

INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) Starting the execution of pre_pull() INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Reading persisted data INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Data retrieved from the persistence: {'last_polled_timestamp': '2023-01-11T15:18:51.644000Z', 'retrieving_ts': '2022-12-29T12:54:33.224Z', 'historic_date_utc': '2023-01-10T00:00:00.001000Z', 'ids_with_same_timestamp': ['xxx'], '@persistence_version': 1} INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Running the persistence upgrade steps INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Running the persistence corrections steps INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Running the persistence corrections steps WARNING InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Some changes have been detected and the persistence needs to be updated. Previous content: {'last_polled_timestamp': '2023-01-11T15:18:51.644000Z', 'retrieving_ts': '2022-12-29T12:54:33.224Z', 'historic_date_utc': '2023-01-10T00:00:00.001000Z', 'ids_with_same_timestamp': ['01a99496-0000-dffe-0000-cb3100010052'], '@persistence_version': 1}. New content: {'last_polled_timestamp': '2023-01-10T00:00:00.000000Z', 'retrieving_ts': '2022-12-29T12:54:33.224Z', 'historic_date_utc': '2023-01-10T00:00:00.000000Z', 'ids_with_same_timestamp': [], '@persistence_version': 1} INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Updating the persistence WARNING InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Persistence has been updated successfully INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) Finalizing the execution of pre_pull() INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Starting data collection every 600 seconds INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Pull Started INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE ROLE ACCOUNTADMIN] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE SNOWFLAKE] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE SCHEMA ACCOUNT_USAGE] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [ALTER WAREHOUSE IF EXISTS COMPUTE_WH RESUME IF SUSPENDED] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE WAREHOUSE COMPUTE_WH] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [alter session set timezone = 'UTC'] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [SELECT * FROM access_history WHERE QUERY_START_TIME >= ? ORDER BY QUERY_START_TI...] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 7 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Removing the duplicate detections if present... INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Statistics for this pull cycle (@devo_pulling_id=1673453142259):Number of requests made: 1; Number of events received: 7; Number of duplicated events filtered out: 0; Number of events generated and sent: 7; Average of events per second: 1.573. INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE ROLE ACCOUNTADMIN] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE SNOWFLAKE] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE SCHEMA ACCOUNT_USAGE] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [ALTER WAREHOUSE IF EXISTS COMPUTE_WH RESUME IF SUSPENDED] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [USE WAREHOUSE COMPUTE_WH] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [alter session set timezone = 'UTC'] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query: [SELECT * FROM access_history WHERE QUERY_START_TIME >= ? ORDER BY QUERY_START_TI...] INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> query execution done INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Number of results in first chunk: 1 INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Removing the duplicate detections if present... INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Statistics for this pull cycle (@devo_pulling_id=1673453142259):Number of requests made: 1; Number of events received: 1; Number of duplicated events filtered out: 1; Number of events generated and sent: 0; Average of events per second: 0.000. INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> The data is up to date! INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Data collection completed. Elapsed time: 6.277 seconds. Waiting for 593.723 second(s) until the next one

After a successful collector’s execution (that is, no error logs found), you will see the following log message:

INFO InputProcess::SnowflakeDataPuller(snowflake,abc123,access_history,predefined,all) -> Statistics for this pull cycle (@devo_pulling_id=1673453142259):Number of requests made: 1; Number of events received: 1; Number of duplicated events filtered out: 1; Number of events generated and sent: 0; Average of events per second: 0.000.

This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:

  1. Edit the configuration file.

  2. Change the value of the historical_date_utc parameter to a different one.

  3. Save the changes.

  4. Restart the collector.

The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.

Login history

The Login_history family of table functions can be used to query login attempts by Snowflake users along various dimensions. This service covers Login_history, which returns login events within a specified time range.

Devo categorization and destination

All events of this service are ingested into the table db.snowflake.history.login

Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.

This service has the following components:

Component

Description

Component

Description

Setup

The setup module is in charge of authenticating the service and managing the token expiration when needed.

Puller

The setup module is in charge of pulling the data in a organized way and delivering the events via SDK.

Setup output

A successful run has the following output messages for the setup module:

Puller output

A successful initial run has the following output messages for the puller module:

After a successful collector’s execution (that is, no error logs found), you will see the following log message:

This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:

  1. Edit the configuration file.

  2. Change the value of the historical_date_utc parameter to a different one.

  3. Save the changes.

  4. Restart the collector.

The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.

Sessions

This service provides information on the session, including information on the authentication method to Snowflake and the Snowflake login event. Snowflake returns one row for each session created over the last year.

Devo categorization and destination

All events of this service are ingested into the table db.snowflake.history.session

Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.

This service has the following components:

Component

Description

Component

Description

Setup

The setup module is in charge of authenticating the service and managing the token expiration when needed.

Puller

The setup module is in charge of pulling the data in a organized way and delivering the events via SDK.

Setup output

A successful run has the following output messages for the setup module:

Puller output

A successful initial run has the following output messages for the puller module:

After a successful collector’s execution (that is, no error logs found), you will see the following log message:

Collector operations

This section is intended to explain how to proceed with the specific operations of this collector.

Change log

Release

Released on

Release type

Details

Recommendations

Release

Released on

Release type

Details

Recommendations

v3.0.0

Jan 15, 2025

IMPROVEMENTS

  • Upgraded the DCSDK from 1.12.4 to 1.13.1

  • Enhanced the working of the custom_sql service by improving persistence logic and solving the high polling issue.

Recommended version

v2.0.0

Oct 7, 2024

IMPROVEMENTS

Improvements

  • Refactored collector to remove persisted last_ids, the only thing stored is the last polled time.

  • Queries are now constructed between timeframes. eg. 2024-01-01T00:00:00 and 2024-01-01T00:05:00

Recommended version

v1.4.1

Aug 21, 2024

IMPROVEMENTS

BUG FIXING

Improvements:

  • Updated DCSDK to 1.12.4

Bug fixes:

  • Fixed state file being copied and storing whole events. It now stores md5 hash of the event.

Update

v1.4.0

Aug 12, 2024

IMPROVEMENTS

Improvements

  • Updated DCSDK to 1.12.2

  • Updated Docker Image to 1.3.0

  • Added snowflake account id decorator to each log for easier sorting

Update

v1.3.1

Jun 14, 2024

BUG FIXING

Bug fixing:

  • Fixed Internal Dependency

Update

v1.3.0

Jun 13, 2024

BUG FIXING

IMPROVEMENTS

Bug fixing:

  • Removed limit from query causing only one record to return

  • Updated Python Dependencies to remove a scaling error

  • Fixed the puller constantly restarting and never marked completed.

Improvements:

  • Added connection closing

  • Added the ability to use a lower role than ACCOUNTADMIN

Update

v1.2.0

May 10, 2024

BUG FIXING

IMPROVEMENTS

Bug fixing:

  • Fixed the custom sql bug in persistance by adding json schema validation and changed the parsing logic.

Improvements:

  • Updated DCSDK from 1.10.2 to 1.11.1

Update

v1.1.0

Dec 20, 2023

BUG FIXING

IMPROVEMENTS

Bug fixing:

  • Fixed the custom sql bug in persistance by adding json schema validation.

Improvements:

  • Updated DCSDK from 1.6.1 to 1.10.2

Update

v1.0.1

Feb 7, 2023

NEW FEATURE

BUG FIXING

 

Improvements:

  • Upgraded DCSDK to v1.6.1

  • A new key called @devo_environment will be added to the event(only for JSON events)

  • Obfuscation service can be now configured from user config and module definition

  • Obfuscation service can now obfuscate items inside arrays

Fixed bugs:

  • No need to use privileged roles anymore

  • All the DatabaseError-s are handled now

  • Fix a coupe data types to avoid errors

  • Statistics are shown correctly now

  • Rename some error messages and numbers

Update

v1.0.0

Dec 29, 2022

NEW FEATURE

New features:

  • Access History: Returns when the user query reads column data and when the SQL statement performs a data write operation.

  • Login History: Returns login events (successful or not) within a specified time range.

  • Session History: Returns data about successful authentications, including the username, method used, application used, etc.

  • Custom SQL: This service allows to perform custom queries.

-