Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "inputs": {
    "sqs_collector": {
      "id": "<FIVE_UNIQUE_DIGITS>",
      "services": {
        "custom_service": {<OPTIONS>,
          "routing_template": "<DESTINATION TAG>"
        }
      },
      "credentials": {
              "aws_cross_account_role": "arn:<PARTITION>:iam::<YOUR_AWS_ACCOUNT_NUMBER>:role/<YOUR_ROLE>",
              "aws_external_id": "<EXTERNAL_ID>"
      },
      "region": "<REGION>",
      "base_url": "https://sqs.<REGION>.amazonaws.com/<YOUR_AWS_ACCOUNT_NUMBER>/<QUEUE_NAME>"
    }
  }
}

...

Collector customization options:

Rw ui children

Flexible Example

Code Block
{
  "global_overrides": {
    "debug": false
  },
  "inputs": {
    "sqs_collector": {
      "id": "12351",
      "enabled": true,
      "credentials": {
        "aws_access_key_id": "",
        "aws_secret_access_key": "",
        "aws_base_account_role": "arn:aws:iam::837131528613476382791543:role/devo-xaccount-cs-rolecc",
        "aws_cross_account_role": "",
        "aws_external_id": ""
      },
      "ack_messages": true,
      "direct_mode": false,
      "do_not_send": false,
      "compressed_events": false,
      "base_url": "https://us-west-1.queue.amazonaws.com/id/name-of-queue",
      "region": "us-west-1",
      "sqs_visibility_timeout": 240,
      "sqs_wait_timeout": 20,
      "sqs_max_messages": 1,
      "services": {
        "custom_service": {
          "file_field_definitions": {},
            "filenamelog_filter_rulestype": [],
 
        "encoding": "gzip",     {
     "send_filtered_out_to_unknown": false,           "file_formatoperator": {"split",
                "typeon": "line_split_processor/",
                "configelement": {0
              "json": true},
             } {
         },       "operator": "replace",
  "record_field_mapping": {             "eventto_simpleNamereplace": "_",
 {               "keysreplace_with": [""
              }
 "event_simpleName"
              ]
 
          }
          },
          "routing_template": "destination tag",
          "linefilename_filter_rules": [
            [
              {
                "sourcetype": "recordmatch",
                "keypattern": "event_simpleName",CloudTrail-Digest"
              }
  "type": "match"          ],
            [
   "value": "EndOfProcess"          {
    }             ]"type": "match",
            [    "pattern": "ConfigWritabilityCheckFile"
              {}
            ]
   "source": "record",      ],
          "keyencoding": "event_simpleNamegzip",
          "send_filtered_out_to_unknown": false,
      "type": "match",   "file_format": {
            "valuetype": "DeliverLocalFXToCloudline_split_processor",
            "config": {
 }             ]"json": true
         ]   }
     }     },
 }     }   }
}

Parameters

...

Parameter

...

Data type

...

Type

...

Value range / Format

...

Details

...

debug

...

bool

...

Discouraged

...

false / true

...

Do not include it. Enabling debug will incur additional costs.

...

id

...

int

...

Mandatory

...

Minimum length: 5
Maximum length: 5

...

Use a unique five digit number.

Note

This parameter is used to build the persistence address, do not use the same value for multiple collectors. It could cause a collision.

...

enabled

...

bool

...

Discouraged

...

false / true

...

If it is false the collector will have no input.

...

base_url

...

str

...

Mandatory

...

 

...

The URL of the SQS queue created during the authorization process.

...

aws_access_key_id

...

str

...

Discouraged

...

Any

...

Use cross account roles instead of keys.

...

aws_secret_access_key

...

str

...

Discouraged

...

Any

...

Use cross account roles instead of keys.

...

aws_base_account_role

...

str

...

Optional

...

Any

...

This is Devo's role. Remove it to use the default set by Devo.

...

aws_cross_account_role

...

str

...

Encouraged

...

Any

...

This is the role created during the authorization process.

...

aws_external_id

...

str

...

Encouraged

...

Any

...

This is the external ID created during the authorization process. If using a cross account role, the external ID is necessary.

...

ack_messages

...

bool

...

Optional

...

false / true

...

This must be true in production. Disabling it will cause duplicate ingestion, incurring a cost.

...

direct_mode

...

bool

...

Optional

...

false / true

...

Set to false for most scenarios.

...

do_not_send

...

bool

...

Discouraged

...

false / true

...

Set to true to not send the log to Devo.

...

sqs_visibility_timeout

...

int

...

Optional

...

Min: 120

Max: 43200

...

Specifies how long the message in the queue will be invisible to the collector after it is requested. If it is not processed and deleted within the allotted time in seconds, the message will be put back and may be processed again.

Set this parameter if the collector has to download large files and process them.

Should you reduce the timeout, you will need to wait for messages with the old timeout to become visible for the new timeout to take full effect.

...

sqs_wait_timeout

...

int

...

Discouraged

...

Min: 20

Max: 20

...

Time the collector waits to get a message. The default, 20 seconds, is recommended.

...

sqs_max_messages

...

int

...

Optional

...

Min: 1

Max: 1

...

Unused.

...

region

...

str

...

Optional

...

Example:

us-east-1

...

The AWS region, which must be the same as the region in the SQS URL.

...

compressed_events

...

bool

...

Optional

...

false / true

...

gzip decompression support. Select false unless the S3 objects are compressed with gzip.

The error ‘utf-8' codec can't decode byte 0xa9 in position 36561456: invalid start byte may indicate the events need to be decompressed.

...

encoding

...

str

...

Optional

...

 

...

Options from most used to least used.

Processors

Processors are selected in the type section within file_format. The processor must match the format of the event in the queue.

...

unseparated_json_processor

...

Split a JSON array into individual Devo events with Python raw_decode. The key parameter is permitted. The parameter may be a string or array. The include parameter is permitted. It a Python dict which renames JSON keys. The rename parameter can rename keys selected by the key parameter.

...

split_or_unseparated_processor

...

Selects the processor by detecting \n.

...

aws_access_logs_processor

...

For AWS access logs and \n splits.

...

single_json_object_processor

...

For messages containing one JSON object. Uses Python orjson to process a single JSON object. Not for arrays. The key parameter is permitted.

...

separated_json_processor

...

Similar to other separators. The default separator is \n. The separator parameter is permitted.

...

bluecoat_processor

...

For Bluecoat recipe.

...

json_object_to_linesplit_processor

...

Split by configured value. The key string parameter or keys array parameter are permitted.

...

json_array_processor

...

Split a JSON array into individual Devo events with Python orjson. The key string parameter or keys array parameter are permitted.

...

json_line_arrays_processor

...

Processes JSON separated by \n. Use separated_json_processor instead.

...

jamf_processor

...

Jamf log processing.

...

parquet_processor

...

Parquet processing using Python pandas.read_parquet. The data is converted to JSON.

...

guardduty_processor

...

For GuardDuty processors.

...

vpc_flow_processor

...

VPC service processor.

...

alt_vpc_flow_processor

...

Used for exception handling.

...

kolide_processor

...

For Kolide service.

...

json_array_vpc_processor

...

VPC service processor.

...

rds_processor

...

RDS processor for the RDS service unseparated_json_processor. Use this if the events come in one massive JSON object.

...

unseparated_json_processor_extract_key

...

The unseparated_json_processor with an additional extraction_key parameter permitted. Use this when filtering on two levels of JSON keys.

If the log message has this format

Code Block
{
  "id": 1,
  "timestamp": 2,
  "logEvents": {
    "another_id": 3,
    "another_timestamp": 4,
    "message": "send to devo"
  }
} 

The configuration

Code Block
"file_format": {
  "type": "unseparated_json_processor_extract_key",
  "config": {
    "key": "logEvents",
    "extraction_key": "message"
  }
},

will send

send to devo

to Devo.

More on processors:

...

file_format

type - A string specifying which processor to use.

...

single_json_object - Logs are stored as/in a JSON object.

single_json_object_processor config options:

  • key -(string) The key of where the list of logs is stored.

Code Block
config: {"key": "log"}
fileobj:  {..."log": {...}}

...

unseparated_json_processor - Logs are stored as/in JSON objects, which are written in a text file with no separator.

unseparated_json config options:

  • key - (string) where the log is stored

  • include (dict: maps names of keys outside of inner part to be included, which can be renamed).

If there is no key, that is, the whole JSON object is the desired log, set "flat": true

See aws_config_collector for example:

Code Block
fileobj:  {...}{...}{...}

...

text_file_processor - logs are stored as text files, potentially with lines and fields separated with e.g. commas and newlines

text_file config options: includes options for how lines and records are separated (e.g. newline, tab, comma), good for csv style data.

...

line_split_processor –- logs stored in a newline separated file, works more quickly than separated_json_processor

config options: “json”: true or false. If setting json to true, assumes that logs are newline-separated json, and allows them to be parsed by the collector therefore enabling record-field mapping

...

separated_json_processor – logs stored as many json objects that have some kind of separator

config options: specify the separator e.g. “separator”: “||”. the default is newline if left unused.

Code Block
fileobj:  {...}||{...}||{...}

...

jamf_processor – special processor for JAMF logs

...

aws_access_logs_processor – special processor for AWS access logs

...

windows_security_processor – special processor for Windows Security logs

...

vpc_flow_processor – special processor for VPC Flow logs

...

json_line_arrays_processor – processor for unseparated json objects that are on multiple lines of a single file.

Code Block
fileobj:  {...}{...}
{...}{...}{...}
{...}

...

dict_processor – processor for logs that comes as python dictionary objects, i.e. in direct mode

...

config - a dictionary of information the specified file_format processor needs

...

record_field_mapping

A dictionary where each key defines a variable that can be parsed out from each record (which may be referenced later in filtering).

For example, we may want to parse something and call it type by getting type from a certain key in the record (which may be multiple layers deep).

Code Block
{"type": {"keys": ["file", "type"]},	"operations": []	}

The keys are a list of how to find a value and handle nesting (essentially, defining a path through the data).

Suppose we have logs that look like this:

Code Block
{“file”: {“type”: { “log_type” : 100}}}

If we want to get the log_type, we should list all the keys needed to parse through the JSON in order:

Code Block
keys: [“file”, “type”, “log_type”]

In many cases, you will probably only need one key, for example, in a flat JSON that isn’t nested:

Code Block
{“log_type”: 100, “other_info”: “blah” ….}

Here you would just specify keys: ["log_type"]. There are some operations that can be used to further alter the parsed information (like split and replace).

This snippet would grab whatever is located at log["file"]["type"] and name it as type. record_field_mapping defines variables by taking them from logs, and these variables can then be used for filtering.

Let’s say you have a log in JSON format like this which will be set to Devo:

Code Block
{“file”: {“value”: 0, “type”: “security_log”}}

...

Now let’s say you want to filter out (not send) any records which have the type security_log. You could write a line_filter_rule as follows:

Code Block
{"source": "record", "key": "type", "type": "match", "value": "security_log" } 
  • We specified the source as record because we want to use a variable from the record_field_mapping.

  • We specified the key as type because that is the name of the variable we defined.

  • We specified type as match because any record matching this rule we want to filter out.

  • And we specified the value as security_log because we specifically do not want to send any records with the type equalling security_log.

The split operation is the same as if you ran the Python split function on a string.

Let’s say you have a filename logs/account_id/folder_name/filename and you want to save the account_id as a variable to use for tag routing or filtering.

You could write a file_field_definition like this:

Code Block
"account_id": [{"operator": "split", "on": "/", "element": 1}]

This would store a variable called account_id by taking the entire filename and splitting it into pieces based on where it finds backslashes, then take the element as position one. In Python, it would look like this:

Code Block
filename.split(“/”)[1]

Tagging

Tagging can be done in many different ways. One way tagging works is by using the file field definitions:

Code Block
  "file_field_definitions": {
    "log_type": [
      {
        "operator": "split",
        "on": "/",
        "element": 2
      }
    ]
  },

These are the elements of the filename object:

...

If you look at the highlighted object filename, you can see that we are splitting and looking for the 2nd value. This starts at 0 like arrays. So:

  • 0 = cequence-data

  • 1 = cequence-devo-6x-NAieMI

  • 2 = detector

"routing_template": "my.app.test_cequence.[file-log_type]"

Our final tag is my.app.test_cequence.detector

Here is another example:

...

file_field_definitions

...

Defined as a dictionary mapping of variable names (you decide) that lists parsing rules.

Each parsing rule has an operator with its own keys. Parsing rules are applied in the order they are listed in the configuration.

  • The split operator uses the on and element keys. The file name will split into pieces considering the character or character sequence specified in the on key, and will extract whatever it is at the specified element index, as in the example below.

  • The replace operator uses the to_replace and replace_with keys.

For example, if your filename is server_logs/12409834/ff.gz, this configuration would store the log_type as serverlogs:

Code Block
"file_field_definitions": 
{
	"log_type": [{"operator": "split", "on": "/", "element": 0}, {"operator": "replace", "to_replace": "_", "replace_with": ""}]
}

...

routing_template

...

A string defining how to build the tag to send each message, for example, my.app.wow.[record-type].[file-log_type]

If the type extracted during record_field_mapping was null, the record would be sent to the tag my.app.wow.null

Options for filtering

Line-level filters

These are a list of rules for filtering out single events.

Expand
titleExample 1

We want to discard all the events that match these conditions:

Code Block
if record.eventName = "HeadObject" or record.eventName = "ListObjects" or record.eventName = "HeadBucket" or record.eventName = "GetBucketLocation" 
 do_not_send_record()

eventName is one of these values: HeadObject, ListObjects, HeadBucket, GetBucketLocation

In Devo, these criteria are specified with the next query. If everything is OK, after configuring the collector properly, there should not be any event if we run this query:

Code Block
from cloud.aws.cloudtrail.s3 where eventName = "HeadObject" or eventName = "ListObjects" or eventName = "HeadBucket" or eventName = "GetBucketLocation"

In this case, the key for the filter is the eventName, so first we need to add the key to the collector in the record_field_mapping section. After the record_field_mapping, we apply the corresponding filters in the line_filter_rules section. In this case, this would be as follows:

Code Block
"record_field_mapping": {
  "eventName": {
    "keys": ["eventName"]
  }
},
"line_filter_rules": [
    [{"source": "record", "key": "eventName", "type": "match", "value": "HeadObject"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "ListObjects"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "HeadBucket"}],
    [{"source": "record", "key": "eventName", "type": "match", "value": "GetBucketLocation"}]
]

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

Note

Note that the logic for these filters is if they match the query, the collector won't send the event to Devo.

...

titleExample 2

What if we want to filter out the events that match this pseudocode query that has mixed conditions?

Code Block
if record.type != "ldap" OR (record.main-log_ornot == main-log AND record.type == "kube-api-server-audit"):
 do_not_send_record()

...

Code Block
"record_field_mapping": {
  "type": {
    "keys": ["type"]
  },
  "main-log_ornot": {
    "keys": ["main-log_ornot"]
  }
},
"line_filter_rules": [
	[{"source": "record", "key": "type", "type": "doesnotmatch", "value": "ldap"}],
    [
        {"source": "record", "key": "main-log_ornot", "type": "match", "value": "main-log"}, 
        {"source": "record", "key": "type", "type": "match", "value": "kube-apiserver-audit"}
    ]
]

Elements in different lists are OR conditions. Elements in the same list are AND conditions.

Note

Note that the logic for these filters is if they match the query, the collector won't send the event to Devo.

File-level filters

These are a list of rules to filter out entire files by the specified pattern applied over the file name.

Expand
titleExample 1
Code Block
"filename_filter_rules": [
    [{"type": "match", "pattern": "CloudTrail-Digest"}],
  	[{"type": "match", "pattern": "ConfigWritabilityCheckFile"}]
]

This will filter out files that contain CloudTrail-Digest or ConfigWritabilityCheckFile.

  • 2024/01/01/CloudTrail-Digest-2024-01-01-00-00-00-123456789012.gz will be skipped.

  • 2024/01/01/ConfigWritabilityCheckFile-2024-01-01-00-00-00-123456789012.gz will be skipped.

Expand
titleExample 2
Code Block
"filename_filter_rules": [
    [{"type": "doesnotmatch", "pattern": "CloudTrail"}],
  	[{"type": "match", "pattern": "CloudTrail-Digest"}]
]

This will filter out files that do not contain CloudTrail or contain CloudTrail-Digest. For instance, files with a name like this:

  • 2024/01/01/CloudTrail-2024-01-01-00-00-00-123456789012.gz will be processed.

  • 2024/01/01/CloudTrail-Digest-2024-01-01-00-00-00-123456789012.gz will be skipped. Config can include "debug_mode": true to print out some useful information as logs come in.
    For local testing, it is useful to set ack_messages to false to try processing without eating from the queue. Be careful to remove this or set it to true when launching the collector. The default is to ack messages if it is not set.

If something seems wrong at launch, you can set the following in the collector parameters/job config:

"debug": true,
"do_not_send": true,
"ack_messages": false ← you will see duplicates if you turn this to false, just set to true when done.

This will print out data as it is being processed, stop messages from getting hacked, and at the last step, data won’t send the data. In this way, you can easily check if something is not working properly.
 "record_field_mapping": {
            "event_simpleName": {
              "keys": [
                "event_simpleName"
              ]
            }
          },
          "routing_template": "destination tag",
          "line_filter_rules": [
            [
              {
                "source": "record",
                "key": "event_simpleName",
                "type": "match",
                "value": "EndOfProcess"
              }
            ],
            [
              {
                "source": "record",
                "key": "event_simpleName",
                "type": "match",
                "value": "DeliverLocalFXToCloud"
              }
            ]
          ]
        }
      }
    }
  }
}