Purpose

The SQS collector can be configured to write any log to any table. Devo recommends use of a pre-built service that fits your logs. If the pre-built services do not fit, you should engage Devo professional services to create a custom service.

If you need to modify or filter logs, Devo recommends AWS Lambda.

Authorize It

  1. Authorize SQS Data Access.

  2. Add data to the S3 bucket. Preferably, the data should be in a consistent format. For example:

    1. If the data is JSON objects, the keys of the JSON objects should be the same. Some objects can omit some keys.

    2. If the data is comma separated value format, the number of columns must always be the same.

Gather Information

Run It

Simple Template

In the Cloud Collector App, create an SQS Collector instance using this parameters template, replacing the values enclosed in < >.

{
  "inputs": {
    "sqs_collector": {
      "id": "<FIVE_UNIQUE_DIGITS>",
      "services": {
        "custom_service": {<OPTIONS>,
          "routing_template": "<DESTINATION TAG>"
        }
      },
      "credentials": {
              "aws_cross_account_role": "arn:<PARTITION>:iam::<YOUR_AWS_ACCOUNT_NUMBER>:role/<YOUR_ROLE>",
              "aws_external_id": "<EXTERNAL_ID>"
      },
      "region": "<REGION>",
      "base_url": "https://sqs.<REGION>.amazonaws.com/<YOUR_AWS_ACCOUNT_NUMBER>/<QUEUE_NAME>"
    }
  }
}

Flexible Example

{
  "global_overrides": {
    "debug": false
  },
  "inputs": {
    "sqs_collector": {
      "id": "12351",
      "enabled": true,
      "credentials": {
        "aws_access_key_id": "",
        "aws_secret_access_key": "",
        "aws_base_account_role": "arn:aws:iam::837131528613:role/devo-xaccount-cs-role",
        "aws_cross_account_role": "",
        "aws_external_id": ""
      },
      "ack_messages": true,
      "direct_mode": false,
      "do_not_send": false,
      "compressed_events": false,
      "base_url": "https://us-west-1.queue.amazonaws.com/id/name-of-queue",
      "region": "us-west-1",
      "sqs_visibility_timeout": 240,
      "sqs_wait_timeout": 20,
      "sqs_max_messages": 1,
      "services": {
        "custom_service": {
          "file_field_definitions": {
            "log_type": [
              {
                "operator": "split",
                "on": "/",
                "element": 0
              },
              {
                "operator": "replace",
                "to_replace": "_",
                "replace_with": ""
              }
            ]
          },
          "filename_filter_rules": [
            [
              {
                "type": "match",
                "pattern": "CloudTrail-Digest"
              }
            ],
            [
              {
                "type": "match",
                "pattern": "ConfigWritabilityCheckFile"
              }
            ]
          ],
          "encoding": "gzip",
          "send_filtered_out_to_unknown": false,
          "file_format": {
            "type": "line_split_processor",
            "config": {
              "json": true
            }
          },
          "record_field_mapping": {
            "event_simpleName": {
              "keys": [
                "event_simpleName"
              ]
            }
          },
          "routing_template": "destination tag",
          "line_filter_rules": [
            [
              {
                "source": "record",
                "key": "event_simpleName",
                "type": "match",
                "value": "EndOfProcess"
              }
            ],
            [
              {
                "source": "record",
                "key": "event_simpleName",
                "type": "match",
                "value": "DeliverLocalFXToCloud"
              }
            ]
          ]
        }
      }
    }
  }
}

Parameters

Parameter

Data type

Type

Value range / Format

Details

debug

bool

Discouraged

false / true

Do not include it. Enabling debug will incur additional costs.

id

int

Mandatory

Minimum length: 5
Maximum length: 5

Use a unique five digit number.

This parameter is used to build the persistence address, do not use the same value for multiple collectors. It could cause a collision.

enabled

bool

Discouraged

false / true

If it is false the collector will have no input.

base_url

str

Mandatory

 

The URL of the SQS queue created during the authorization process.

aws_access_key_id

str

Discouraged

Any

Use cross account roles instead of keys.

aws_secret_access_key

str

Discouraged

Any

Use cross account roles instead of keys.

aws_base_account_role

str

Optional

Any

This is Devo's role. Remove it to use the default set by Devo.

aws_cross_account_role

str

Encouraged

Any

This is the role created during the authorization process.

aws_external_id

str

Encouraged

Any

This is the external ID created during the authorization process. If using a cross account role, the external ID is necessary.

ack_messages

bool

Optional

false / true

This must be true in production. Disabling it will cause duplicate ingestion, incurring a cost.

direct_mode

bool

Optional

false / true

Set to false for most scenarios.

do_not_send

bool

Discouraged

false / true

Set to true to not send the log to Devo.

sqs_visibility_timeout

int

Optional

Min: 120

Max: 43200

Specifies how long the message in the queue will be invisible to the collector after it is requested. If it is not processed and deleted within the allotted time in seconds, the message will be put back and may be processed again.

Set this parameter if the collector has to download large files and process them.

Should you reduce the timeout, you will need to wait for messages with the old timeout to become visible for the new timeout to take full effect.

sqs_wait_timeout

int

Discouraged

Min: 20

Max: 20

Time the collector waits to get a message. The default, 20 seconds, is recommended.

sqs_max_messages

int

Optional

Min: 1

Max: 1

Unused.

region

str

Optional

Example:

us-east-1

The AWS region, which must be the same as the region in the SQS URL.

compressed_events

bool

Optional

false / true

gzip decompression support. Select false unless the S3 objects are compressed with gzip.

The error ‘utf-8' codec can't decode byte 0xa9 in position 36561456: invalid start byte may indicate the events need to be decompressed.

encoding

str

Optional

 

Options from most used to least used.

File Format Processors

Processors are selected in the type section within file_format. The processor must match the format of the event in the queue.

split_or_unseparated_processor

Selects the processor by detecting \n. If unsure, this processor is recommended.

line_split_processor

Split an object into logs at each \n character using Python splitlines. Optionally, the log can be split using:

  • chunks, a boolean parameter which enables the remaining parameters.

  • indices, a Python integer array which selects lines from the object.

  • substrings, a Python string array which selects lines from the object containing the configured strings.

  • regex, a Python string array which selects lines from the object using python re.findall.

text_file_processor

Finds header information and adds it to each event. The line_separator parameter is required. The header can be identified using one of these parameters.

  • header (boolean) and header_field_separator to get headers from the first line of data.

  • field_names for a manual header.

  • field_separator for numbered fields.

Additionally, if use_json is true, Python orjson will convert the data to JSON.

unseparated_json_processor

Split a JSON array into individual Devo events with Python raw_decode. The key parameter is permitted. The parameter may be a string or array. The include parameter is permitted. It a dictionary which renames JSON keys. The rename parameter can rename keys selected by the key parameter.

single_json_object_processor

For messages containing one JSON object. Uses Python orjson to process a single JSON object. Not for arrays. The key parameter is permitted.

separated_json_processor

Similar to other separators. The default separator is \n. The separator parameter is permitted.

json_object_to_linesplit_processor

Split by configured value. The key string parameter or keys array parameter are permitted.

unseparated_json_processor_extract_key

The unseparated_json_processor with an additional extraction_key parameter permitted. Use this when filtering on two levels of JSON keys.

If the log message has this format

{
  "id": 1,
  "timestamp": 2,
  "logEvents": {
    "another_id": 3,
    "another_timestamp": 4,
    "message": "send to devo"
  }
} 

The configuration

"file_format": {
  "type": "unseparated_json_processor_extract_key",
  "config": {
    "key": "logEvents",
    "extraction_key": "message"
  }
},

will send

send to devo

to Devo.

json_array_processor

Split a JSON array into individual Devo events with Python orjson. The key string parameter or keys array parameter are permitted.

json_line_arrays_processor

Processes JSON separated by \n. Use separated_json_processor instead.

aws_access_logs_processor

For AWS access logs and \n splits.

bluecoat_processor

Bluecoat.

jamf_processor

Jamf logs.

parquet_processor

Parquet processing using Python pandas.read_parquet. The data is converted to JSON.

guardduty_processor

For GuardDuty processors.

vpc_flow_processor

AWS VPC.

alt_vpc_flow_processor

Used for exception handling.

kolide_processor

For Kolide.

json_array_vpc_processor

AWS VPC.

rds_processor

RDS processor for the RDS service unseparated_json_processor. Use this if the events come in one massive JSON object.

windows_security_processor

Windows security logs.

Custom Service Options

file_format

type A string specifying which processor to use, from the list above.

config A dictionary of the processor’s parameters.

record_field_mapping

A dictionary where each key defines a variable that can be parsed out from each record (which may be referenced later in filtering).

To parse something and call it type by getting type from a certain key in the record (which may be multiple layers deep).

{"type": {"keys": ["file", "type"]},	"operations": []	}

Suppose we have logs that look like this:

{“file”: {“type”: { “log_type” : 100}}}

To get the log_type, we should list all the keys needed to parse through the JSON in order:

keys: [“file”, “type”, “log_type”]

In many cases, you will probably only need one key, for example, in a flat JSON that isn’t nested:

{“log_type”: 100, “other_info”: “blah” ….}

Here you would just specify keys: ["log_type"]. There are some operations that can be used to further alter the parsed information (like split and replace).

This snippet would grab whatever is located at log["file"]["type"] and name it as type. record_field_mapping defines variables by taking them from logs, and these variables can then be used for filtering.

If you have a log in JSON format like this which will be set to Devo:

{“file”: {“value”: 0, “type”: “security_log”}}

Specifying type in record_field_mapping will allow the collector to extract the security_log and save it as type.

To change the tag using a field mapping, change the routing_template to something like my.app.datasource.[record-type]. In the case of the log above, it would be sent to my.app.datasource.security_log.

To filter out (not send) any records which have the type security_log, write a line_filter_rule as follows:

{"source": "record", "key": "type", "type": "match", "value": "security_log" } 
  • We specified the source as record because we want to use a variable from the record_field_mapping.

  • We specified the key as type because that is the name of the variable we defined.

  • We specified type as match because any record matching this rule we want to filter out.

  • And we specified the value as security_log because we specifically do not want to send any records with the type equalling security_log.

The split operation is the same as if you ran the Python split function on a string.

Given a filename logs/account_id/folder_name/filename and you want to save the account_id as a variable to use for tag routing or filtering.

You could write a file_field_definition like this:

"account_id": [{"operator": "split", "on": "/", "element": 1}]

This would store a variable called account_id by taking the entire filename and splitting it into pieces based on where it finds backslashes, then take the element as position one. In Python, it would look like this:

filename.split(“/”)[1]

Automatic Tagging

Tags can be generated using record field mapping or file field definitions.

File name split

  "file_field_definitions": {
    "log_type": [
      {
        "operator": "split",
        "on": "/",
        "element": 2
      }
    ]
  },

If the filename field is

foo/bar/baz/qux.json

then

"routing_template": "my.app.test.[file-log_type]"

results in tag my.app.test.baz

File name split and replace

If the filename field is

foo_bar/baz/qux.gz

it is helpful to remove the unwanted special character when creating the tag. For example,

  "file_field_definitions": {
    "log_type": [
      {
        "operator": "split",
        "on": "/",
        "element": 0
      },
      {
        "operator": "replace",
        "to_replace": "_",
        "replace_with": ""
      }
    ]
  }

combined with

"routing_template": "my.app.test.[file-log_type]"

will result in

my.app.test.foobar

Options for filtering

Line-level filters

These are a list of rules for filtering out single events.

We want to discard all the events that match these conditions:

if record.eventName = "HeadObject" or record.eventName = "ListObjects" or record.eventName = "HeadBucket" or record.eventName = "GetBucketLocation" 
 do_not_send_record()

eventName is one of these values: HeadObject, ListObjects, HeadBucket, GetBucketLocation

In Devo, these criteria are specified with the next query. If everything is OK, after configuring the collector properly, there should not be any event if we run this query:

from cloud.aws.cloudtrail.s3 where eventName = "HeadObject" or eventName = "ListObjects" or eventName = "HeadBucket" or eventName = "GetBucketLocation"

In this case, the key for the filter is the eventName, so first we need to add the key to the collector in the record_field_mapping section. After the record_field_mapping, we apply the corresponding filters in the line_filter_rules section. In this case, this would be as follows:

"record_field_mapping": {
  "eventName": {
    "keys": ["eventName"]
  }
},
"line_filter_rules": [
    [{"source": "record", "key": "eventName", "type": "match", "value": "HeadObject"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "ListObjects"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "HeadBucket"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "GetBucketLocation"}]
]

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

Note that the logic for these filters is if they match the query, the collector won't send the event to Devo.

What if we want to filter out the events that match this pseudocode query that has mixed conditions?

if record.type != "ldap" OR (record.main-log_ornot == main-log AND record.type == "kube-api-server-audit"):
 do_not_send_record()

In this case, the keys for the filter are type and main-log_ornot, so first we need to add the keys to the collector in the record_field_mapping section. Once we’ve added the keys, we apply the corresponding filters. In this case, the filters would be as follows:

"record_field_mapping": {
  "type": {
    "keys": ["type"]
  },
  "main-log_ornot": {
    "keys": ["main-log_ornot"]
  }
},
"line_filter_rules": [
	[{"source": "record", "key": "type", "type": "doesnotmatch", "value": "ldap"}],
    [
        {"source": "record", "key": "main-log_ornot", "type": "match", "value": "main-log"}, 
        {"source": "record", "key": "type", "type": "match", "value": "kube-apiserver-audit"}
    ]
]

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

Note that the logic for these filters is if they match the query, the collector won't send the event to Devo.

File-level filters

These are a list of rules to filter out entire files by the specified pattern applied over the file name.

"filename_filter_rules": [
    [{"type": "match", "pattern": "CloudTrail-Digest"}],
  	[{"type": "match", "pattern": "ConfigWritabilityCheckFile"}]
]

This will filter out files that contain CloudTrail-Digest or ConfigWritabilityCheckFile.

  • 2024/01/01/CloudTrail-Digest-2024-01-01-00-00-00-123456789012.gz will be skipped.

  • 2024/01/01/ConfigWritabilityCheckFile-2024-01-01-00-00-00-123456789012.gz will be skipped.

"filename_filter_rules": [
    [{"type": "doesnotmatch", "pattern": "CloudTrail"}],
  	[{"type": "match", "pattern": "CloudTrail-Digest"}]
]

This will filter out files that do not contain CloudTrail or contain CloudTrail-Digest. For instance, files with a name like this:

  • 2024/01/01/CloudTrail-2024-01-01-00-00-00-123456789012.gz will be processed.

  • 2024/01/01/CloudTrail-Digest-2024-01-01-00-00-00-123456789012.gz will be skipped. Config can include "debug_mode": true to print out some useful information as logs come in.
    For local testing, it is useful to set ack_messages to false to try processing without eating from the queue. Be careful to remove this or set it to true when launching the collector. The default is to ack messages if it is not set.

If something seems wrong at launch, you can set the following in the collector parameters/job config:

"debug": true,
"do_not_send": true,
"ack_messages": false ← you will see duplicates if you turn this to false, just set to true when done.

This will print out data as it is being processed, stop messages from getting hacked, and at the last step, data won’t send the data. In this way, you can easily check if something is not working properly.