/
AWS (S3+SQS) collector

AWS (S3+SQS) collector

Overview

Logs generated by most AWS services (CloudTrail, VPC Flows, Elastic Load Balancer, etc.) are exportable to a blob object in S3. Many other 3rd party services have also adopted this paradigm so it has become a common pattern used by many different technologies. Devo Professional Services and Technical Acceleration teams have a base-collector code that will leverage this S3 paradigm to collect logs and can be customized for different customer's different technology logs that may be stored in S3.

This documentation will go through setting up your AWS infrastructure for our collector integration to work out of the box:

  • Sending data to S3 (this guide uses CloudTrail as a data source service)

  • Setting up S3 event notifications to SQS

  • Enabling SQS and S3 access using a cross-account IAM role

  • Gathering information to be provided to Devo for collector setup

General architecture diagram

2bd87f6b-a1f6-4d91-b079-7933757fb68b.png

Requirements

  • Access to S3, SQS, IAM, and CloudTrail services

  • Permissions to send data to S3

  • Knowledge of log format/technology type being stored in S3

Create S3 bucket and set up data feed (CloudTrail example) 

Check this article for a setup configuration example.

Devo collector features

Feature

Details

Feature

Details

Allow parallel downloading (multipod)

allowed

Running environments

  • collector server

  • on-premise

Populated Devo events

table

Flattening Preprocessing

no

Data sources

Data source

Description

Collector service name

Devo table

Available from release

Data source

Description

Collector service name

Devo table

Available from release

Any

Theoretically any source you send to an SQS can be collected

 

 

v1.0.0

CONFIG LOGS

 

aws_sqs_config

cloud.aws.configlogs.events

v1.0.0

AWS ELB

 

aws_sqs_elb

web.aws.elb.access

v1.0.0

AWS ALB

 

aws_sqs_alb

web.aws.alb.access

v1.0.0

CISCO UMBRELLA

 

aws_sqs_cisco_umbrella

sig.cisco.umbrella.dns

v1.0.0

CLOUDFLARE LOGPUSH

 

aws_sqs_cloudflare_logpush

cloud.cloudflare.logpush.http

v1.0.0

CLOUDFLARE AUDIT

 

aws_sqs_cloudflare_audit

cloud.aws.cloudflare.audit

v1.0.0

CLOUDTRAIL

 

aws_sqs_cloudtrail

cloud.aws.cloudtrail.*

v1.0.0

CLOUDTRAIL VIA KINESIS FIREHOSE

 

aws_sqs_cloudtrail_kinesis

cloud.aws.cloudtrail.*

v1.0.0

CLOUDWATCH

 

aws_sqs_cloudwatch

cloud.aws.cloudwatch.logs

v1.0.0

CLOUDWATCH VPC

 

aws_sqs_cloudwatch_vpc

cloud.aws.vpc.flow

v1.0.0

CONTROL TOWER

VPC Flow Logs, Cloudtrail, Cloudfront, and/or AWS config logs

aws_sqs_control_tower

 

v1.0.0

FDR

 

aws_sqs_fdr

edr.crowdstrike.cannon

v1.0.0

GUARD DUTY

 

aws_sqs_guard_duty

cloud.aws.guardduty.findings

v1.0.0

GUARD DUTY VIA KINESIS FIREHOUSE

 

aws_sqs_guard_duty_kinesis

cloud.aws.guardduty.findings

v1.0.0

IMPERVA INCAPSULA

 

aws_sqs_incapsula

cef0.imperva.incapsula

v1.0.0

LACEWORK

 

aws_sqs_lacework

monitor.lacework.

v1.0.0

PALO ALTO

 

aws_sqs_palo_alto

firewall.paloalto.[file-log_type]

v1.0.0

ROUTE 53

 

aws_sqs_route53

dns.aws.route53

v1.0.0

OS LOGS

 

aws_sqs_os

box.[file-log_type].[file-log_subtype].us

v1.0.0

SENTINEL ONE FUNNEL

 

aws_sqs_s1_funnel

edr.sentinelone.dv

v1.0.0

S3 ACCESS

 

aws_sqs_s3_access

web.aws.s3.access

v1.0.0

VPC LOGS

 

aws_sqs_vpc

cloud.aws.vpc.flow

v1.0.0

WAF LOGS

 

aws_sqs_waf

cloud.aws.waf.logs

v1.0.0

Run the collector

Setting Visibility Window properly

When setting the visibility window you need to know how long its taking to process each message. There is a query in devo where you can find that information out.

from devo.collectors.out where weakhas(msg, "Process time") and toktains(raw, "collector-<collector_id>")

From here you will see a list of results. This will give you an a good idea how long its taking to process.

IF you’re using the aws_sqs_fdr_large for crowdstrike you might see up to even passed 30 minutes. So the visibility window needs to be at least 1800. But with most collections you’ll see it at less than 5 seconds so the value should be set to the minimum of 120 seconds. This will cause duplicates if not set properly.

Verify data collection

Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.

This service has the following components:

Component

Description

Component

Description

Setup

The setup module is in charge of authenticating the service and managing the token expiration when needed.

Puller

The setup module is in charge of pulling the data in a organized way and delivering the events via SDK.

Setup output

A successful run has the following output messages for the setup module:

2024-01-16T12:47:04.044 INFO OutputProcess::MainThread -> Process started 2024-01-16T12:47:04.044 INFO InputProcess::MainThread -> Process Started 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> InputThread(sqs_collector,12345) - Starting thread (execution_period=60s) 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> ServiceThread(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread (execution_period=60s) 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPullerSetup(unknown,sqs_collector#12345,aws_sqs_vpc#predefined) -> Starting thread 2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread 2024-01-16T12:47:04.178 WARNING InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Waiting until setup will be executed 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSender(standard_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(standard_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManager(standard_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSender(lookup_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.192 INFO OutputProcess::ConsoleSenderManager(standard_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(standard_senders,manager,console_1) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.192 INFO OutputProcess::OutputStandardConsumer(standard_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputStandardConsumer(standard_senders_consumer_0) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(lookup_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManager(lookup_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSender(internal_senders,console_sender_0) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::ConsoleSenderManager(lookup_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(lookup_senders,manager,console_1) -> Nothing retrieved from the persistence. 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(internal_senders,console_1) -> Starting thread (every 300 seconds) 2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManager(internal_senders,manager,console_1) -> Starting thread 2024-01-16T12:47:04.193 INFO OutputProcess::OutputLookupConsumer(lookup_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputLookupConsumer(lookup_senders_consumer_0) -> Nothing retrieved from the persistence. 2024-01-16T12:47:05.795 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Starting data collection every 5 seconds

Puller output

A successful initial run has the following output messages for the puller module:

Note that the PrePull action is executed only one time before the first run of the Pull action.

I2024-01-16T17:02:56.221036303Z 2024-01-16T17:02:56.220 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Acked message receiptHandle: /+qA+ymL2Vs8yb//++7YM2Ef8BCetrJ+/+////F1uwLOVfONfagI99vA= 2024-01-16T17:02:56.221386926Z 2024-01-16T17:02:56.221 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Data collection completed. Elapsed time: 2.413 seconds. Waiting for 2.587 second(s) until the next one

Restart the persistence

This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:

  1. Delete and Re-DO the collector with new ID number

The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.

Note that this action clears the persistence and cannot be recovered in any way. Resetting persistence could result in duplicate or lost events.

Collector operations

This section is intended to explain how to proceed with specific operations of this collector.

Verify collector operations

The initialization module is in charge of setup and running the input (pulling logic) and output (delivering logic) services and validating the given configuration.

Events delivery and Devo ingestion

The event delivery module is in charge of receiving the events from the internal queues where all events are injected by the pullers and delivering them using the selected compatible delivery method.

A successful run has the following output messages for the initializer module:

Sender services

The Integrations Factory Collector SDK has 3 different senders services depending on the event type to delivery (internal, standard, and lookup). This collector uses the following Sender Services:

Sender Services

Description

Sender Services

Description

internal_senders

In charge of delivering internal metrics to Devo such as logging traces or metrics.

standard_senders

In charge of delivering pulled events to Devo.

Sender statistics

Each service displays its own performance statistics that allow checking how many events have been delivered to Devo by type:

Logging trace

Description

Logging trace

Description

Number of available senders: 1

Displays the number of concurrent senders available for the given Sender Service.

sender manager internal queue size: 0

Displays the items available in the internal sender queue.

This value helps detect bottlenecks and needs to increase the performance of data delivery to Devo. This last can be made by increasing the concurrent senders.

Total number of messages sent: 44, messages sent since "2022-06-28 10:39:22.511671+00:00": 21 (elapsed 0.007 seconds)

Displayes the number of events from the last time and following the given example, the following conclusions can be obtained:

  • 44 events were sent to Devo since the collector started.

  • The last checkpoint timestamp was 2022-06-28 10:39:22.511671+00:00.

  • 21 events where sent to Devo between the last UTC checkpoint and now.

  • Those 21 events required 0.007 seconds to be delivered.

Check memory usage

To check the memory usage of this collector, look for the following log records in the collector which are displayed every 5 minutes by default, always after running the memory free process.

  • The used memory is displayed by running processes and the sum of both values will give the total used memory for the collector.

  • The global pressure of the available memory is displayed in the global value.

  • All metrics (Global, RSS, VMS) include the value before freeing and after: previous -> after freeing memory

Enable/disable the logging debug mode

Sometimes it is necessary to activate the debug mode of the collector's logging. This debug mode increases the verbosity of the log and allows you to print execution traces that are very helpful in resolving incidents or detecting bottlenecks in heavy download processes.

  • To enable this option you just need to edit the configuration file and change the debug_status parameter from false to true and restart the collector.

  • To disable this option, you just need to update the configuration file and change the debug_status parameter from true to false and restart the collector.

For more information, visit the configuration and parameterization section corresponding to the chosen deployment mode.

Change log

Release

Released on

Release type

Details

Recommendations

Release

Released on

Release type

Details

Recommendations

v1.7.1-1.7.3

Jan 21, 2025

Bug FixesImprovements

Bug Fixes

  • Fixed decorators

  • Fixed bug with sub pre operation

  • Reverted changes that cuased a bug with log operations

Improvements

  • Removed Visibility Window, Max Messages, And SQS timeout parameters from config

    • You can still add/edit these to new values. Look for the query in the docs to see how to se the visibility window.

 

v1.7.0

Oct 16, 2024

Bug Fixes

FEATURES

Bug Fixes

  • Fixed control tower issue

  • Fixed bug with Falcon Data Replicator Large where logs were taking over an hour to finish

Features

  • Created custom tagging off of record field mapping

  • Created NLB logging service

  • Added INFO/DEBUG logging around each method so users can see size and timing.

Recommended Version

v1.6.4

Oct 7, 2024

Bug Fixes
Improvements

Features

  • Created custom tagging off of record field mapping

  • Added INF0/DEBUG logging around most methods so users can see size and timing.

Bug Fixes

  • Fixed Dependency Issue.

  • Fixed control tower issue

  • Fixed Falcon Data Replicator Large where logs were taking over an hour to finish.

Upgrade

v1.6.3

Oct 7, 2024

Bug Fixes

Bug Fixes

  • Fixed Log Operations Bug

  • Added Backwards compatibility to control tower

  • Fixed Palo Alto Service for snappy decompression.

Upgrade

v1.6.2

Sep 24, 2024

Bug Fixes

Bug Fixes

  • None type causing message processing to fail fdr_large, fixed.

  • Added default region to initialization of sts client to prevent needing environment variables in the green cluster.

  • Fixed bug in control tower processor

Upgrade

v1.6.1

Sep 3, 2024

IMPROVEMENTS

Improvements

  • Created new processor for extracting a message from singular log

Upgrade

v1.6.0

Jul 15, 2024

BUG FIXES

IMPROVEMENTS

Improvements

  • Increased DCSDK to 1.12.2 to 1.12.4

  • Removed Multithreading

  • Added a setup method

  • Removed Deduplication

  • Added debugging logging for using dynamic filenames to help with creating dynamic tags

Bug fixes

  • Fixed a bug where the message body was a string and caused a type error.

  • Fixed a bug where client was not refreshed in time before acknowledging a message.

Upgrade

v1.5.1

Jul 11, 2024

BUG FIXES

Bug fixes

Fixed dependency issue

Upgrade

v1.5.0

Jul 10, 2024

BUG FIXES

IMPROVEMENTS

Feature

  • Removed debug_md5 and made it default for all dictionary logs

  • Created a new vpc flow processor

  • Added new sender for relay in house + TLS

  • Added persistence functionality for gzip sending buffer

  • Added Automatic activation of gzip sending

Improvements

  • Updated docker image to 1.3.0

  • Updated DCDSK from 1.11.1 to 1.12.2

  • Fixed high vulnerability in Docker Image

  • Upgrade DevoSDK dependency to version v5.4.0

  • Fixed error in persistence system

  • Applied changes to make DCSDK compatible with MacOS

  • Added new sender for relay in house + TLS

  • Added persistence functionality for gzip sending buffer

  • Added Automatic activation of gzip sending

  • Improved behaviour when persistence fails

  • Upgraded DevoSDK dependency

  • Fixed console log encoding

  • Restructured python classes

  • Improved behaviour with non-utf8 characters

  • Decreased defaut size value for internal queues (Redis limitation, from 1GiB to 256MiB)

  • New persistence format/structure (compression in some cases)

  • Removed dmesg execution (It was invalid for docker execution)

Upgrade

v1.4.0

Jun 26, 2024

BUG FIXES

IMPROVEMENTS

FEATURES

Features

  • Implemented use of pulling events sent by event bridge

  • Added more debugging information to be added to events such as: Time the message was sent to queue, times it has been sent to the queue, the bucket, and file name.

Bug fixes

  • Fixed an import dependency error

Improvements

  • Upped the visibility timeout to 1 hour by default

Upgrade

v1.3.2

Jun 17, 2024

BUG FIXES

Bug fixing

  • Fixed the initialization of the client credentials that was missing the token.

Upgrade

v1.3.1

Jun 14, 2024

BUG FIXES

Bug fixing

  • Fixed index out of range error in aws_sqs_fdr_large service

Upgrade

v1.3.0

Jun 10, 2024

FEATURES

Features

  • Fixed logging message saying the message wasn’t acked event though it was

  • Added use of 1-6 messages back in config

  • Added multithreading for downloading messages in parallel

  • Updated the aws_sqs_fdr_large service with a faster downloading method using ijson.

Upgrade

v1.2.3

May 29, 2024

FEATURES

Features

  • Updated to orjson for performance qualities.

Upgrade

v1.2.2

Apr 30, 2024

FEATURES

Features

  • Changed processors in handling of the log from str to json dumps

Upgrade

v1.2.1

Apr 9, 2024

FEATURES

Features

  • Added file filtering to the incapsula service

Upgrade

v1.2.0

Mar 22, 2024

IMPROVEMENTS

FEATURES

  • Updated to DCSDK 1.11.1

    • Added extra check for not valid message timestamps

    • Added extra check for improve the controlled stop

    •  Changed default number for connection retries (now 7)

    •  Fix for Devo connection retries

Upgrade

v1.1.3

Feb 23, 2024

IMPROVEMENTS

BUG FIXES

FEATURES

 

Bug fixes

  • Fixed bug in parquet log processing

  • Fixed the max number of messages and updated the message timeout in flight

  • Fixed the way access key and secret are used

Improvements

  • Updated to DCSDK 1.11.0

Features

  • Added feature to send md5 message to my.app table

  • Added RDS service to collector defs

Upgrade

v1.0.1

Jan 29, 2024

IMPROVEMENTS

BUG FIXES

Bug fixes

  • state file fixed

Improvements

  • using run method, instead of pull to enable long polling.

  • adding different types of encoding (latin-1)

  • update collector defs to be objects instead of arrays which was throwing off tagging, and record field mapping.

Upgrade

v1.0.0

Jan 19, 2024

INTIAL RELEASE

Released with DCSDK 1.10.2

Initial version