/
General setup for S3 + SQS collector

General setup for S3 + SQS collector

Overview

Data source

Description

Collector service name

Devo table

Available from

Data source

Description

Collector service name

Devo table

Available from

Any

Any source you send to an SQS can be collected.

 -

 -

v1.0.0

CONFIG LOGS

 -

aws_sqs_config

cloud.aws.configlogs.events

v1.0.0

AWS ELB

 -

aws_sqs_elb

web.aws.elb.access

v1.0.0

AWS ALB

 -

aws_sqs_alb

web.aws.alb.access

web.aws.alb.connection

v1.0.0

CISCO UMBRELLA

 -

aws_sqs_cisco_umbrella

sig.cisco.umbrella.dns

v1.0.0

CLOUDFLARE LOGPUSH

 -

aws_sqs_cloudflare_logpush

cloud.cloudflare.logpush.http

v1.0.0

CLOUDFLARE AUDIT

 -

aws_sqs_cloudflare_audit

cloud.aws.cloudflare.audit

v1.0.0

CLOUDTRAIL

 -

aws_sqs_cloudtrail

cloud.aws.cloudtrail.*

v1.0.0

CLOUDTRAIL VIA KINESIS FIREHOSE

 -

aws_sqs_cloudtrail_kinesis

cloud.aws.cloudtrail.*

v1.0.0

CLOUDWATCH

 -

aws_sqs_cloudwatch

cloud.aws.cloudwatch.logs

v1.0.0

CLOUDWATCH VPC

 -

aws_sqs_cloudwatch_vpc

cloud.aws.vpc.flow

v1.0.0

CONTROL TOWER

VPC Flow Logs, Cloudtrail, Cloudfront, and/or AWS config logs

aws_sqs_control_tower

 -

v1.0.0

FDR

 -

aws_sqs_fdr

edr.crowdstrike.cannon

v1.0.0

FDR LARGE

The files can be so large and hard to pull that if the service above fails, use this one.

aws_sqs_fdr_large

edr.crowdstrike.cannon

 

GUARD DUTY

 -

aws_sqs_guard_duty

cloud.aws.guardduty.findings

v1.0.0

GUARD DUTY VIA KINESIS FIREHOUSE

 -

aws_sqs_guard_duty_kinesis

cloud.aws.guardduty.findings

v1.0.0

IMPERVA INCAPSULA

 -

aws_sqs_incapsula

cef0.imperva.incapsula

v1.0.0

JAMF

 -

aws_sqs_jamf

my.app.[file-log_type].logs

v1.0.0

KUBERNETES

 -

aws_sqs_kubernetes

my.app.kubernetes.events

v1.0.0

LACEWORK

 -

aws_sqs_lacework

monitor.lacework

v1.0.0

PALO ALTO

 -

aws_sqs_palo_alto

firewall.paloalto.[file-log_type]

v1.0.0

RDS

Relational Database Audit Logs

aws_sqs_rds

cloud.aws.rds.audit

v1.1.1

ROUTE 53

 -

aws_sqs_route53

dns.aws.route53

v1.0.0

OS LOGS

 -

aws_sqs_os

box.[file-log_type].[file-log_subtype].us

v1.0.0

SENTINEL ONE FUNNEL

 -

aws_sqs_s1_funnel

edr.sentinelone.dv

v1.0.0

S3 ACCESS

 -

aws_sqs_s3_access

web.aws.s3.access

v1.0.0

VPC LOGS

 -

aws_sqs_vpc

cloud.aws.vpc.flow

v1.0.0

WAF LOGS

 -

aws_sqs_waf

cloud.aws.waf.logs

v1.0.0

General configuration

For each setup, you can use this general config:

{ "global_overrides": { "debug": false }, "inputs": { "sqs_collector": { "id": "34523", "enabled": true, "credentials": { "aws_cross_account_role": "if provided", "aws_external_id": "if needed/supplied" }, "base_url": "https://sqs.us-east-2.amazonaws.com/" "services": { "aws_sqs_kubernetes": { "encoding": "gzip", "type": "unseparated_json_processor", "config": { "key": "logEvents" } } } } } }

The services are listed above. Every part of the service is overridable, so if you need to change the encoding, you can do it freely. You can also leave the service as "service_name": {}

Default configuration

Config logs

{ "global_overrides": { "debug": false }, "inputs": { "sqs_collector": { "id": "34523", "enabled": true, "credentials": { "aws_cross_account_role": "if provided", "aws_external_id": "if needed/supplied" }, "region": "us-east-2", "base_url": "https://sqs.us-east-2.amazonaws.com/", "sqs_visibility_timeout": 120 "sqs_wait_timeout": 20 "sqs_max_messages": 1 "ack_messages": true, "services": { "aws_sqs_config": { "file_field_definitions": { "account": [ { "operator": "split", "on": "/", "element": -1 }, { "operator": "split", "on": "_", "element": 0 } ], "region": [ { "operator": "split", "on": "/", "element": -1 }, { "operator": "split", "on": "_", "element": 2 } ] }, "filename_filter_rules": [], "encoding": "gzip", "file_format": { "type": "line_split_json_processor", "config": { "key": "configurationItems", "include": { "fileVersion": "fileVersion", "configSnapshotId": "configSnapshotId" } } }, "record_field_mapping": {}, "routing_template": "cloud.aws.configlogs.events.[file-account].[file-region]", "line_filter_rules": [] } } } } }

ALB/ELB Logs

{ "global_overrides": { "debug": false }, "inputs": { "sqs_collector": { "id": "34523", "enabled": true, "credentials": { "aws_cross_account_role": "if provided", "aws_external_id": "if needed/supplied" }, "region": "us-east-2", "base_url": "https://sqs.us-east-2.amazonaws.com/", "sqs_visibility_timeout": 120 "sqs_wait_timeout": 20 "sqs_max_messages": 1 "ack_messages": true, "services": { "aws_sqs_alb": { "file_field_definitions": { "type": [ { "operator": "split", "on": "/", "element": 2 }, { "operator": "split", "on": "-", "element": 0 } ], "account": [ { "operator": "split", "on": "/", "element": 4 } ], "region": [ { "operator": "split", "on": "/", "element": -1 }, { "operator": "split", "on": "_", "element": 2 } ] }, "encoding": "gzip", "file_format": { "type": "line_split_processor", "config": { "json": true } }, "record_field_mapping": {}, "routing_template": "web.aws.alb.[file-type].[file-region].[file-account]", "line_filter_rules": [] } } } } }

The ALB and ELB configs are nearly identical. If you want to change from one service or the other replace every with alb with elb and vice versa.

Cisco Umbrella

Cloudflare Logpush

Cloudflare Audit Logs

Cloudtrail Logs

Cloudtrail via Kinesis Firehose

Cloudwatch Logs

Cloudwatch VPC Logs

Control Tower

Crowdstrike Falcon Data Replication

Crowdstrike Falcon Data Replication Large

Guard duty

Guard duty via Kinesis Firehose

Imperva Incapsula Logs

JAMF Logs

Kubernetes Logs

Lacework Logs

Palo Alto Logs

If the logs. are formatted the routing template will be firewall.paloalto.[file-log_type].json

RDS Logs

Route 53

OS Logs

Sentinel One Funnel

S3 Access Logs

VPC Logs

WAF Logs

Custom services or overrides

For a custom service or override, the config can look like this:

The main things you need:

  • file_format is type of processor.

  • routing_template is the tag you need.

Collectors that need custom tags

aws_sqs_rds

  • cloud.aws.rds.audit.SQS_REGION.SQS_ACCID

  • SQS_REGION needs to be filled in.

  • SQS_ACCID needs to be filled in.

  • It is possible to put in information about the database that it’s coming from, it doesn’t have to be account IDs.

Types of processors

unseparated_json_processor

This is if the events come in json in one massive object use this.

split_or_unseparated_processor

This will determine if the log is split by \n or not.

aws_access_logs_processor

For AWS access logs and \n splits.

single_json_object_processor

This is for one JSON object.

separated_json_processor

Similar to other separators.

bluecoat_processor

For Bluecoat recipe.

json_object_to_linesplit_processor

Split by configured value.

json_array_processor

For JSON array processors

json_line_arrays_processor

Similar to other separators

jamf_processor

Jamf log processing.

parquet_processor

Parquet encoding.

guardduty_processor

For GuardDuty processors.

vpc_flow_processor

VPC service processor.

alt_vpc_flow_processor

VPC service processor.

kolide_processor

For Kolide service.

json_array_vpc_processor

VPC service processor.

rds_processor

RDS processor for the RDS service unseparated_json_processor. Use this if the events come in one massive JSON object.

unseparated_json_processor_extract_key

This is a key value extraction method. If the log message has this format {“id”: 12345, “timestamp”: 2024, “logEvents”: {“another_id”: 34352, “another_timestamp”: 2024, “message”: {the actual log message you want to send to devo}}}. You’d use a config like this

More on processors:

file_format

type - A string specifying which processor to use.

 

 

 

 

 

single_json_object - Logs are stored as/in a JSON object.

single_json_object_processor config options:

  • key -(string) The key of where the list of logs is stored.

unseparated_json_processor - Logs are stored as/in JSON objects, which are written in a text file with no separator.

unseparated_json config options:

  • key - (string) where the log is stored

  • include (dict: maps names of keys outside of inner part to be included, which can be renamed).

If there is no key, that is, the whole JSON object is the desired log, set "flat": true

See aws_config_collector for example:

text_file_processor - logs are stored as text files, potentially with lines and fields separated with e.g. commas and newlines

text_file config options: includes options for how lines and records are separated (e.g. newline, tab, comma), good for csv style data.

line_split_processor –- logs stored in a newline separated file, works more quickly than separated_json_processor

config options: “json”: true or false. If setting json to true, assumes that logs are newline-separated json, and allows them to be parsed by the collector therefore enabling record-field mapping

separated_json_processor – logs stored as many json objects that have some kind of separator

config options: specify the separator e.g. “separator”: “||”. the default is newline if left unused.

jamf_processor – special processor for JAMF logs

aws_access_logs_processor – special processor for AWS access logs

windows_security_processor – special processor for Windows Security logs

vpc_flow_processor – special processor for VPC Flow logs

json_line_arrays_processor – processor for unseparated json objects that are on multiple lines of a single file.

dict_processor – processor for logs that comes as python dictionary objects, i.e. in direct mode

config - a dictionary of information the specified file_format processor needs

record_field_mapping

A dictionary where each key defines a variable that can be parsed out from each record (which may be referenced later in filtering).

For example, we may want to parse something and call it type by getting type from a certain key in the record (which may be multiple layers deep).

The keys are a list of how to find a value and handle nesting (essentially, defining a path through the data).

Suppose we have logs that look like this:

If we want to get the log_type, we should list all the keys needed to parse through the JSON in order:

In many cases, you will probably only need one key, for example, in a flat JSON that isn’t nested:

Here you would just specify keys: ["log_type"]. There are some operations that can be used to further alter the parsed information (like split and replace).

This snippet would grab whatever is located at log["file"]["type"] and name it as type. record_field_mapping defines variables by taking them from logs, and these variables can then be used for filtering.

Let’s say you have a log in JSON format like this which will be set to Devo:

Specifying type in record_field_mapping will allow the collector to extract the security_log and save it as type.

Now let’s say you want to change the tag dynamically based on that value. You could change the routing_template to something like my.app.datasource.[record-type]. In the case of the log above, it would be sent to my.app.datasource.security_log.

Now let’s say you want to filter out (not send) any records which have the type security_log. You could write a line_filter_rule as follows:

  • We specified the source as record because we want to use a variable from the record_field_mapping.

  • We specified the key as type because that is the name of the variable we defined.

  • We specified type as match because any record matching this rule we want to filter out.

  • And we specified the value as security_log because we specifically do not want to send any records with the type equalling security_log.

The split operation is the same as if you ran the Python split function on a string.

Let’s say you have a filename logs/account_id/folder_name/filename and you want to save the account_id as a variable to use for tag routing or filtering.

You could write a file_field_definition like this:

This would store a variable called account_id by taking the entire filename and splitting it into pieces based on where it finds backslashes, then take the element as position one. In Python, it would look like this:

Tagging

Tagging can be done in many different ways. One way tagging works is by using the file field definitions:

These are the elements of the filename object:

If you look at the highlighted object filename, you can see that we are splitting and looking for the 2nd value. This starts at 0 like arrays. So:

  • 0 = cequence-data

  • 1 = cequence-devo-6x-NAieMI

  • 2 = detector

"routing_template": "my.app.test_cequence.[file-log_type]"

Our final tag is my.app.test_cequence.detector

Here is another example:

file_field_definitions

Defined as a dictionary mapping of variable names (you decide) that lists parsing rules.

Each parsing rule has an operator with its own keys. Parsing rules are applied in the order they are listed in the configuration.

  • The split operator uses the on and element keys. The file name will split into pieces considering the character or character sequence specified in the on key, and will extract whatever it is at the specified element index, as in the example below.

  • The replace operator uses the to_replace and replace_with keys.

For example, if your filename is server_logs/12409834/ff.gz, this configuration would store the log_type as serverlogs:

routing_template

A string defining how to build the tag to send each message, for example, my.app.wow.[record-type].[file-log_type]

If the type extracted during record_field_mapping was null, the record would be sent to the tag my.app.wow.null

Options for filtering

Line-level filters

These are a list of rules for filtering out single events.

We want to discard all the events that match these conditions:

eventName is one of these values: HeadObject, ListObjects, HeadBucket, GetBucketLocation

In Devo, these criteria are specified with the next query. If everything is OK, after configuring the collector properly, there should not be any event if we run this query:

In this case, the key for the filter is the eventName, so first we need to add the key to the collector in the record_field_mapping section. After the record_field_mapping, we apply the corresponding filters in the line_filter_rules section. In this case, this would be as follows:

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

Note that the logic for these filters is if they match the query, the collector won't send the event to Devo.

What if we want to filter out the events that match this pseudocode query that has mixed conditions?

In this case, the keys for the filter are type and main-log_ornot, so first we need to add the keys to the collector in the record_field_mapping section. Once we’ve added the keys, we apply the corresponding filters. In this case, the filters would be as follows:

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

File-level filters

These are a list of rules to filter out entire files by the specified pattern applied over the file name.

This will filter out files that contain CloudTrail-Digest or ConfigWritabilityCheckFile.

  • 2024/01/01/CloudTrail-Digest-2024-01-01-00-00-00-123456789012.gz will be skipped.

  • 2024/01/01/ConfigWritabilityCheckFile-2024-01-01-00-00-00-123456789012.gz will be skipped.

Debug MD5

This service gets an MD5 hash from the queue as well as a receipt handle. You can see these in action in the cloud.aws table or edr.crowdstrike table.This information as well as other AWS SQS metadata is now added to most of the events. This will help you know if the message has been sent to the queue more than once, what message it was apart of, etc.

@devo_message_md5: The md5 of the message it was sent from

@devo_bucket_name: The name of the s3 bucket the message came from

@devo_file_name: The name of the file from the s3 bucket

@devo_file_size: The file size

@devo_enqueued_time: The time it approximate reached the queue in epoch

@devo_messsage_receive_count: The approximate times it’s been sent to the queue

Metrics and Visibility Window

With version 1.6.5 and above there are now metrics of compressed file size and time it takes to process an event notification.

If you were unaware messages in the queue contain the name and location of the bucket. These could have a list of sub files that need to be processed. In the case of Crowdstrike Falcon Data replicator they could be over a gigabyte and have 50-100 sub files to process. Each of theses sub files contain anywhere from 75-100,000 log messages that need to be sent to Devo. This single event notification could take anywhere from 10-30 minutes, maybe more to process. If you are seeing duplicates, it could be the visibility window is too low. In the case of 30 minutes to process one event notification it might need to be 18000 seconds to cover the half an hour.

There is also a log entry on how many messages there are in the queue and how many are in flight these numbers are approximate and pulled right from the SQS connection.