Overview
Logs generated by most AWS services (Cloudtrail, VPC Flows, Elastic Load Balancer, etc.) are exportable to a blob object in S3. Many other 3rd party services have also adopted this paradigm so it has become a common pattern used by many different technologies. Devo Professional Services and Technical Acceleration teams have a base-collector code that will leverage this S3 paradigm to collect logs and can be customized for different customer's different technology logs that may be stored in S3.
This documentation will go through setting up your AWS infrastructure for our collector integration to work out of the box:
Sending data to S3 (this guide uses Cloudtrail as a data source service)
Setting up S3 event notifications to SQS
Enabling SQS and S3 access using a cross-account IAM role
Gathering information to be provided to Devo for collector setup
General architecture diagram
Requirements
Access to S3, SQS, IAM, and CloudTrail services
Permissions to send data to S3
Knowledge of log format/technology type being stored in S3
Devo collector features
Feature | Details |
---|---|
Allow parallel downloading ( |
|
Running environments |
|
Populated Devo events |
|
Flattening Preprocessing |
|
Data sources
Data source | Description | Collector service name | Devo table | Available from release |
---|---|---|---|---|
Any | Theoretically any source you send to an SQS can be collected |
|
|
|
CONFIG LOGS |
|
|
|
|
AWS ELB |
|
|
|
|
AWS ALB |
|
|
|
|
CISCO UMBRELLA |
|
|
|
|
CLOUDFLARE LOGPUSH |
|
|
|
|
CLOUDFLARE AUDIT |
|
|
|
|
CLOUDTRAIL |
|
|
|
|
CLOUDTRAIL VIA KINESIS FIREHOSE |
|
|
|
|
CLOUDWATCH |
|
|
|
|
CLOUDWATCH VPC |
|
|
|
|
CONTROL TOWER | VPC Flow Logs, Cloudtrail, Cloudfront, and/or AWS config logs |
|
|
|
FDR |
|
|
|
|
GUARD DUTY |
|
|
|
|
GUARD DUTY VIA KINESIS FIREHOUSE |
|
|
|
|
IMPERVA INCAPSULA |
|
|
|
|
LACEWORK |
|
|
|
|
PALO ALTO |
|
|
|
|
ROUTE 53 |
|
|
|
|
OS LOGS |
|
|
|
|
SENTINEL ONE FUNNEL |
|
|
|
|
S3 ACCESS |
|
|
|
|
VPC LOGS |
|
|
|
|
WAF LOGS |
|
|
|
|
Options
See examples of common configurations here: General S3 Collector Configuration Examples and Recipes
There are many configurable options outlined in the README on the GitLab link, reproduced here. See GitLab repository for specific examples in each subdirectory.
direct_mode --- true or false (default is false), set to true if the logs are being sent directly to the queue without using s3.
file_field_definitions --- defined as a dictionary mapping variable names (you decide) to lists of parsing rules.
each parsing rule has an operator, with its own keys which go along with it. Parsing rules are applied in the order they are listed in the configuration.The "split" operator takes an "on" and an "element" -- the file name will split into pieces on the character or character sequence specified by "on" and extract whatever is at the specified "element" index as in the example.
the "replace" operator take a "to_replace" and a "replace_with"
For example, if your filename were "
server_logs/12409834/ff.gz
", this configuration would store the log_type as "serverlogs
"
"file_field_definitions": { "log_type": [{"operator": "split", "on": "/", "element": 0}, {"operator": "replace", "to_replace": "_", "replace_with": ""}] }
filename_filter_rules: a list of rules for filtering out entire files.
encoding -- takes a string from one of the following: “gzip” “none” “parquet”
ack_messages -- whether or not to delete messages from the queue after processing, takes boolean values. If not specified, default is true. We recommend leaving this out of the config. If you see it in there, pay close attention to if it’s on or off.
file_format -- takes a dictionary with the following keys
type -- a string specifying which processor to use
single_json_object -- logs are stored as/in a json object
single_json_object_processor config options: “key” (string: the key of where the list of logs is stored) See cloudtrail_collector for example.
config: {"key": "log"} fileobj: {..."log": {...}}
unseparated_json_processor -- logs are stored as/in json objects which are written in a text file with no separator
unseparated_json config options: “key” (string: where the log is stored), “include” (dict: maps names of keys outside of inner part to be included, which can be renamed). If there is no key, that is, the whole JSON object is the desired log, set “flat”: true See aws_config_collector for example
fileobj: {...}{...}{...}
text_file_processor -- logs are stored as text files, potentially with lines and fields separated with e.g. commas and newlines
text_file config options: includes options for how lines and records are separated (e.g. newline, tab, comma), good for csv style data.
line_split_processor –- logs stored in a newline separated file, works more quickly than separated_json_processor
config options: “json”: true or false. If setting json to true, assumes that logs are newline-separated json, and allows them to be parsed by the collector therefore enabling record-field mapping
separated_json_processor – logs stored as many json objects that have some kind of separator
config options: specify the separator e.g. “separator”: “||”. the default is newline if left unused.
fileobj: {...}||{...}||{...}
jamf_processor – special processor for JAMF logs
aws_access_logs_processor – special processor for AWS access logs
windows_security_processor – special processor for Windows Security logs
vpc_flow_processor – special processor for VPC Flow logs
json_line_arrays_processor – processor for unseparated json objects that are on multiple lines of a single file
fileobj: {...}{...} {...}{...}{...} {...}
dict_processor – processor for logs that comes as python dictionary objects, i.e. in direct mode
config -- a dictionary of information the specified file_format processor needs
record_field_mapping -- a dictionary -- each key defines a variable that can be parsed out from each record (which may be referenced later in filtering)
e.g., we may want to parse something and call it "type", by getting "type" from a certain key in the record (which may be multiple layers deep).{"type": {"keys": ["file", "type"]}, "operations": [] }
keys is a list of how key values in the record to look into to find the value, its to handle nesting (essentially defining a path through the data). Suppose we have logs that look like this:
{“file”: {“type”: { “log_type” : 100}}}
so if we want to get the log_type, we should list all the keys needed to parse through the json in order:
keys: [“file”, “type”, “log_type”]
In many cases you will probably only need one key.
e.g. in flat json that isn’t nested
{“log_type”: 100, “other_info”: “blah” ….}
here you would just specify keys: [“log_type”]. A few operations are supported that can be used to further alter the parsed information (like split and replace). This snippet would grab whatever is located at log[“file”][“type”] and name it as “type”. record_field_mapping defines variables by taking them from logs, and these variables can then be used for filtering. Let’s say you have a log in json format like this which will be set to devo:
{“file”: {“value”: 0, “type”: “security_log”}}
Specifying “type” in the record_field_mapping will allow the collector to extract that value, “security_log” and save it as type. Now let’s say you want to change the tag dynamically based on that value. You could change the routing_template to something like my.app.datasource.[record-type]. In the case of the log above, it would be sent to my.app.datasource.security_log. Now let’s say you want to filter out (not send) any records which have the type security_log. You could write a line_filter_rule as follows:
{"source": "record", "key": "type", "type": "match", "value": "security_log" }
We specified the source as record because we want to use a variable from the record_field_mapping. We specified the key as “type” because that is the name of the variable we defined. We specify type as “match” because any record matching this rule we want to filter out. And we specify the value as security_log because we specifically do not want to send any records with the type equalling “security_log” The split operation is the same as if you ran the python split function on a string.Let’s say you have a filename “logs/account_id/folder_name/filename” and you want to save the account_id as a variable to use for tag routing or filtering.
You could write a file_field_definition like this:
"account_id": [{"operator": "split", "on": "/", "element": 1}]
This would store a variable called account_id by taking the entire filename and splitting it into pieces based on where it finds backslashes, then take the element as position one. In Python it would look like:
filename.split(“/”)[1]
routing_template -- a string defining how to build the tag to send each message. e.g.
"my.app.wow.[record-type].[file-log_type]" -- if the "type" extracted during record_field_mapping were "null", the record would be sent to the tag "my.app.wow.null"line_filter_rules -- a list of lists of rules for filtering out individual records so they do not get sent to devo
for example:
"line_filter_rules": [ [{ "source": "record", "key": "type", "type": "doesnotmatch", "value": "ldap" }], [ {"source": "file", "key": "main-log_ornot", "type": "match", "value": "main-log"}, {"source": "record", "key": "type", "type": "match", "value": "kube-apiserver-audit"}, ] ]
This set of rules could be expressed in pseudocode as follows:if record.type != "ldap" OR (file.main-log_ornot == main-log AND record.type == "kube-api-server-audit"):
do_not_send_record()
(Internal) Notes + Debugging
Config can include "debug_mode": true to print out some useful information as logs come in.
For local testing it is useful to set "ack_messages" to false, to try processing without eating from the queue. Be careful to remove this or set it to true when launching the collector. The default is to ack messages if it is not set.
If something seems wrong at launch, you can set the following in the collector parameters/ job config.
"debug_mode": true,
"do_not_send": true,
"ack_messages": false
This will print out data as it is being processed, stop messages from getting hacked, and at the last step, not actually send the data (so you can see if something is breaking without the customer getting wrongly formatted repeat data without consuming from the queue and losing data)
Verify data collection
Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.
This service has the following components:
Component | Description |
---|---|
Setup | The setup module is in charge of authenticating the service and managing the token expiration when needed. |
Puller | The setup module is in charge of pulling the data in a organized way and delivering the events via SDK. |
Setup output
A successful run has the following output messages for the setup module:
2024-01-16T12:47:04.044 INFO OutputProcess::MainThread -> Process started 2024-01-16T12:47:04.044 INFO InputProcess::MainThread -> Process Started 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> InputThread(sqs_collector,12345) - Starting thread (execution_period=60s) 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> ServiceThread(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread (execution_period=60s) 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPullerSetup(unknown,sqs_collector#12345,aws_sqs_vpc#predefined) -> Starting thread 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread 2024-01-16T12:47:04.178 WARNING InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Waiting until setup will be executed 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSender(standard_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(standard_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManager(standard_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSender(lookup_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.192 INFO OutputProcess::ConsoleSenderManager(standard_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(standard_senders,manager,console_1) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.192 INFO OutputProcess::OutputStandardConsumer(standard_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputStandardConsumer(standard_senders_consumer_0) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(lookup_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManager(lookup_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSender(internal_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::ConsoleSenderManager(lookup_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(lookup_senders,manager,console_1) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(internal_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManager(internal_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::OutputLookupConsumer(lookup_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputLookupConsumer(lookup_senders_consumer_0) -> Nothing retrieved from the persistence. 2024-01-16T12:47:05.795 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Starting data collection every 5 seconds
Puller output
A successful initial run has the following output messages for the puller module:
Note that the PrePull
action is executed only one time before the first run of the Pull
action.
I2024-01-16T17:02:56.221036303Z 2024-01-16T17:02:56.220 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Acked message receiptHandle: /+qA+ymL2Vs8yb//++7YM2Ef8BCetrJ+/+////F1uwLOVfONfagI99vA= 2024-01-16T17:02:56.221386926Z 2024-01-16T17:02:56.221 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Data collection completed. Elapsed time: 2.413 seconds. Waiting for 2.587 second(s) until the next one
Restart the persistence
This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:
Delete and Re-DO the collector with new ID number
The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.
Note that this action clears the persistence and cannot be recovered in any way. Resetting persistence could result in duplicate or lost events.
Collector operations
This section is intended to explain how to proceed with specific operations of this collector.
Verify collector operations
The initialization module is in charge of setup and running the input (pulling logic) and output (delivering logic) services and validating the given configuration.
Events delivery and Devo ingestion
The event delivery module is in charge of receiving the events from the internal queues where all events are injected by the pullers and delivering them using the selected compatible delivery method.
A successful run has the following output messages for the initializer module:
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0 INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {} INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Sender: SyslogSender(standard_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True} INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Standard - Total number of messages sent: 44, messages sent since "2022-06-28 10:39:22.511671+00:00": 44 (elapsed 0.007 seconds) INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0 INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {} INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Sender: SyslogSender(internal_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True} INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Internal - Total number of messages sent: 1, messages sent since "2022-06-28 10:39:22.516313+00:00": 1 (elapsed 0.019 seconds)
Sender services
The Integrations Factory Collector SDK has 3 different senders services depending on the event type to delivery (internal
, standard
, and lookup
). This collector uses the following Sender Services:
Sender Services | Description |
---|---|
| In charge of delivering internal metrics to Devo such as logging traces or metrics. |
| In charge of delivering pulled events to Devo. |
Sender statistics
Each service displays its own performance statistics that allow checking how many events have been delivered to Devo by type:
Logging trace | Description |
---|---|
| Displays the number of concurrent senders available for the given Sender Service. |
| Displays the items available in the internal sender queue. This value helps detect bottlenecks and needs to increase the performance of data delivery to Devo. This last can be made by increasing the concurrent senders. |
| Displayes the number of events from the last time and following the given example, the following conclusions can be obtained:
By default these traces will be shown every 10 minutes. |
Check memory usage
To check the memory usage of this collector, look for the following log records in the collector which are displayed every 5 minutes by default, always after running the memory free process.
The used memory is displayed by running processes and the sum of both values will give the total used memory for the collector.
The global pressure of the available memory is displayed in the
global
value.All metrics (Global, RSS, VMS) include the value before freeing and after:
previous -> after freeing memory
INFO InputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(34.50MiB -> 34.08MiB), VMS(410.52MiB -> 410.02MiB) INFO OutputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(28.41MiB -> 28.41MiB), VMS(705.28MiB -> 705.28MiB)
Differences between RSS
and VMS
memory usage:
RSS
is the Resident Set Size, which is the actual physical memory the process is usingVMS
is the Virtual Memory Size which is the virtual memory that process is using
Enable/disable the logging debug mode
Sometimes it is necessary to activate the debug mode of the collector's logging. This debug mode increases the verbosity of the log and allows you to print execution traces that are very helpful in resolving incidents or detecting bottlenecks in heavy download processes.
To enable this option you just need to edit the configuration file and change the debug_status parameter from false to true and restart the collector.
To disable this option, you just need to update the configuration file and change the debug_status parameter from true to false and restart the collector.
For more information, visit the configuration and parameterization section corresponding to the chosen deployment mode.
Change log
Release | Released on | Release type | Details | Recommendations |
---|---|---|---|---|
| IMPROVEMENTSFEATURES |
|
| |
| IMPROVEMENTSBUG FIXESFEATURES
| Bug fixes
Improvements
Features
|
| |
| IMPROVEMENTSBUG FIXES | Bug fixes
Improvements
|
| |
| INTIAL RELEASE | Released with DCSDK 1.10.2 |
|