AWS (S3+SQS) collector
Overview
Logs generated by most AWS services (CloudTrail, VPC Flows, Elastic Load Balancer, etc.) are exportable to a blob object in S3. Many other 3rd party services have also adopted this paradigm so it has become a common pattern used by many different technologies. Devo Professional Services and Technical Acceleration teams have a base-collector code that will leverage this S3 paradigm to collect logs and can be customized for different customer's different technology logs that may be stored in S3.
This documentation will go through setting up your AWS infrastructure for our collector integration to work out of the box:
Sending data to S3 (this guide uses CloudTrail as a data source service)
Setting up S3 event notifications to SQS
Enabling SQS and S3 access using a cross-account IAM role
Gathering information to be provided to Devo for collector setup
General architecture diagram
Requirements
Access to S3, SQS, IAM, and CloudTrail services
Permissions to send data to S3
Knowledge of log format/technology type being stored in S3
Create S3 bucket and set up data feed (CloudTrail example)
Check this article for a setup configuration example.
Devo collector features
Feature | Details |
---|---|
Allow parallel downloading ( |
|
Running environments |
|
Populated Devo events |
|
Flattening Preprocessing |
|
Data sources
Data source | Description | Collector service name | Devo table | Available from release |
---|---|---|---|---|
Any | Theoretically any source you send to an SQS can be collected |
|
|
|
CONFIG LOGS |
|
|
|
|
AWS ELB |
|
|
|
|
AWS ALB |
|
|
|
|
CISCO UMBRELLA |
|
|
|
|
CLOUDFLARE LOGPUSH |
|
|
|
|
CLOUDFLARE AUDIT |
|
|
|
|
CLOUDTRAIL |
|
|
|
|
CLOUDTRAIL VIA KINESIS FIREHOSE |
|
|
|
|
CLOUDWATCH |
|
|
|
|
CLOUDWATCH VPC |
|
|
|
|
CONTROL TOWER | VPC Flow Logs, Cloudtrail, Cloudfront, and/or AWS config logs |
|
|
|
FDR |
|
|
|
|
GUARD DUTY |
|
|
|
|
GUARD DUTY VIA KINESIS FIREHOUSE |
|
|
|
|
IMPERVA INCAPSULA |
|
|
|
|
LACEWORK |
|
|
|
|
PALO ALTO |
|
|
|
|
ROUTE 53 |
|
|
|
|
OS LOGS |
|
|
|
|
SENTINEL ONE FUNNEL |
|
|
|
|
S3 ACCESS |
|
|
|
|
VPC LOGS |
|
|
|
|
WAF LOGS |
|
|
|
|
Run the collector
Setting Visibility Window properly
When setting the visibility window you need to know how long its taking to process each message. There is a query in devo where you can find that information out.
from devo.collectors.out where weakhas(msg, "Process time") and toktains(raw, "collector-<collector_id>")
From here you will see a list of results. This will give you an a good idea how long its taking to process.
IF you’re using the aws_sqs_fdr_large for crowdstrike you might see up to even passed 30 minutes. So the visibility window needs to be at least 1800. But with most collections you’ll see it at less than 5 seconds so the value should be set to the minimum of 120 seconds. This will cause duplicates if not set properly.
Verify data collection
Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.
This service has the following components:
Component | Description |
---|---|
Setup | The setup module is in charge of authenticating the service and managing the token expiration when needed. |
Puller | The setup module is in charge of pulling the data in a organized way and delivering the events via SDK. |
Setup output
A successful run has the following output messages for the setup module:
2024-01-16T12:47:04.044 INFO OutputProcess::MainThread -> Process started
2024-01-16T12:47:04.044 INFO InputProcess::MainThread -> Process Started
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> InputThread(sqs_collector,12345) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> ServiceThread(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPullerSetup(unknown,sqs_collector#12345,aws_sqs_vpc#predefined) -> Starting thread
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread
2024-01-16T12:47:04.178 WARNING InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Waiting until setup will be executed
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSender(standard_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(standard_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManager(standard_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSender(lookup_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::ConsoleSenderManager(standard_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(standard_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::OutputStandardConsumer(standard_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputStandardConsumer(standard_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(lookup_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManager(lookup_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSender(internal_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::ConsoleSenderManager(lookup_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(lookup_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(internal_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManager(internal_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::OutputLookupConsumer(lookup_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputLookupConsumer(lookup_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:05.795 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Starting data collection every 5 seconds
Puller output
A successful initial run has the following output messages for the puller module:
Note that the PrePull
action is executed only one time before the first run of the Pull
action.
I2024-01-16T17:02:56.221036303Z 2024-01-16T17:02:56.220 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Acked message receiptHandle: /+qA+ymL2Vs8yb//++7YM2Ef8BCetrJ+/+////F1uwLOVfONfagI99vA=
2024-01-16T17:02:56.221386926Z 2024-01-16T17:02:56.221 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Data collection completed. Elapsed time: 2.413 seconds. Waiting for 2.587 second(s) until the next one
Restart the persistence
This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:
Delete and Re-DO the collector with new ID number
The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.
Note that this action clears the persistence and cannot be recovered in any way. Resetting persistence could result in duplicate or lost events.
Collector operations
This section is intended to explain how to proceed with specific operations of this collector.
Verify collector operations
The initialization module is in charge of setup and running the input (pulling logic) and output (delivering logic) services and validating the given configuration.
Events delivery and Devo ingestion
The event delivery module is in charge of receiving the events from the internal queues where all events are injected by the pullers and delivering them using the selected compatible delivery method.
A successful run has the following output messages for the initializer module:
Sender services
The Integrations Factory Collector SDK has 3 different senders services depending on the event type to delivery (internal
, standard
, and lookup
). This collector uses the following Sender Services:
Sender Services | Description |
---|---|
| In charge of delivering internal metrics to Devo such as logging traces or metrics. |
| In charge of delivering pulled events to Devo. |
Sender statistics
Each service displays its own performance statistics that allow checking how many events have been delivered to Devo by type:
Logging trace | Description |
---|---|
| Displays the number of concurrent senders available for the given Sender Service. |
| Displays the items available in the internal sender queue. This value helps detect bottlenecks and needs to increase the performance of data delivery to Devo. This last can be made by increasing the concurrent senders. |
| Displayes the number of events from the last time and following the given example, the following conclusions can be obtained:
|
Check memory usage
To check the memory usage of this collector, look for the following log records in the collector which are displayed every 5 minutes by default, always after running the memory free process.
The used memory is displayed by running processes and the sum of both values will give the total used memory for the collector.
The global pressure of the available memory is displayed in the
global
value.All metrics (Global, RSS, VMS) include the value before freeing and after:
previous -> after freeing memory
Enable/disable the logging debug mode
Sometimes it is necessary to activate the debug mode of the collector's logging. This debug mode increases the verbosity of the log and allows you to print execution traces that are very helpful in resolving incidents or detecting bottlenecks in heavy download processes.
To enable this option you just need to edit the configuration file and change the debug_status parameter from false to true and restart the collector.
To disable this option, you just need to update the configuration file and change the debug_status parameter from true to false and restart the collector.
For more information, visit the configuration and parameterization section corresponding to the chosen deployment mode.
Change log
Release | Released on | Release type | Details | Recommendations |
---|---|---|---|---|
| Jan 21, 2025 | Bug FixesImprovements | Bug Fixes
Improvements
|
|
| Oct 16, 2024 | Bug Fixes FEATURES | Bug Fixes
Features
|
|
| Oct 7, 2024 | Bug Fixes | Features
Bug Fixes
|
|
| Oct 7, 2024 | Bug Fixes | Bug Fixes
|
|
| Sep 24, 2024 | Bug Fixes | Bug Fixes
|
|
| Sep 3, 2024 | IMPROVEMENTS | Improvements
|
|
| Jul 15, 2024 | BUG FIXES IMPROVEMENTS | Improvements
Bug fixes
|
|
| Jul 11, 2024 | BUG FIXES | Bug fixes Fixed dependency issue |
|
| Jul 10, 2024 | BUG FIXES IMPROVEMENTS | Feature
Improvements
|
|
| Jun 26, 2024 | BUG FIXES IMPROVEMENTS FEATURES | Features
Bug fixes
Improvements
|
|
| Jun 17, 2024 | BUG FIXES | Bug fixing
|
|
| Jun 14, 2024 | BUG FIXES | Bug fixing
|
|
| Jun 10, 2024 | FEATURES | Features
|
|
| May 29, 2024 | FEATURES | Features
|
|
| Apr 30, 2024 | FEATURES | Features
|
|
| Apr 9, 2024 | FEATURES | Features
|
|
| Mar 22, 2024 | IMPROVEMENTS FEATURES |
|
|
| Feb 23, 2024 | IMPROVEMENTS BUG FIXES FEATURES
| Bug fixes
Improvements
Features
|
|
| Jan 29, 2024 | IMPROVEMENTS BUG FIXES | Bug fixes
Improvements
|
|
| Jan 19, 2024 | INTIAL RELEASE | Released with DCSDK 1.10.2 |
|