...
Logs generated by most AWS services (CloudtrailCloudTrail, VPC Flows, Elastic Load Balancer, etc.) are exportable to a blob object in S3. Many other 3rd party services have also adopted this paradigm so it has become a common pattern used by many different technologies. Devo Professional Services and Technical Acceleration teams have a base-collector code that will leverage this S3 paradigm to collect logs and can be customized for different customer's different technology logs that may be stored in S3.
...
Sending data to S3 (this guide uses Cloudtrail CloudTrail as a data source service)
Setting up S3 event notifications to SQS
Enabling SQS and S3 access using a cross-account IAM role
Gathering information to be provided to Devo for collector setup
...
Feature | Details |
---|---|
Allow parallel downloading ( |
|
Running environments |
|
Populated Devo events |
|
Flattening Preprocessing |
|
Data sources
Data source | Description | Collector service name | Devo table | Available from release |
---|---|---|---|---|
Any | Theoretically any source you send to an SQS can be collected |
|
|
|
CONFIG LOGS |
|
|
|
|
AWS ELB |
|
|
|
|
AWS ALB |
|
|
|
|
CISCO UMBRELLA |
|
|
|
|
CLOUDFLARE LOGPUSH |
|
|
|
|
CLOUDFLARE AUDIT |
|
|
|
|
CLOUDTRAIL |
|
|
|
|
CLOUDTRAIL VIA KINESIS FIREHOSE |
|
|
|
|
CLOUDWATCH |
|
|
|
|
CLOUDWATCH VPC |
|
|
|
|
CONTROL TOWER | VPC Flow Logs, Cloudtrail, Cloudfront, and/or AWS config logs |
|
|
|
FDR |
|
|
|
|
GUARD DUTY |
|
|
|
|
GUARD DUTY VIA KINESIS FIREHOUSE |
|
|
|
|
IMPERVA INCAPSULA |
|
|
|
|
LACEWORK |
|
|
|
|
PALO ALTO |
|
|
|
|
ROUTE 53 |
|
|
|
|
OS LOGS |
|
|
|
|
SENTINEL ONE FUNNEL |
|
|
|
|
S3 ACCESS |
|
|
|
|
VPC LOGS |
|
|
|
|
WAF LOGS |
|
|
|
|
Run the collector
Check memory usage
To check the memory usage of this collector, look for the following log records in the collector which are displayed every 5 minutes by default, always after running the memory free process.
The used memory is displayed by running processes and the sum of both values will give the total used memory for the collector.
The global pressure of the available memory is displayed in the
global
value.All metrics (Global, RSS, VMS) include the value before freeing and after:
previous -> after freeing memory
Code Block |
---|
INFO InputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(34.50MiB -> 34.08MiB), VMS(410.52MiB -> 410.02MiB)
INFO OutputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(28.41MiB -> 28.41MiB), VMS(705.28MiB -> 705.28MiB) |
Differences between RSS
and VMS
memory usage:
RSS
is the Resident Set Size, which is the actual physical memory the process is usingVMS
is the Virtual Memory Size which is the virtual memory that process is using
Enable/disable the logging debug mode
Sometimes it is necessary to activate the debug mode of the collector's logging. This debug mode increases the verbosity of the log and allows you to print execution traces that are very helpful in resolving incidents or detecting bottlenecks in heavy download processes.
To enable this option you just need to edit the configuration file and change the debug_status parameter from false to true and restart the collector.
To disable this option, you just need to update the configuration file and change the debug_status parameter from true to false and restart the collector.
For more information, visit the configuration and parameterization section corresponding to the chosen deployment mode.
Change log
...
Release
...
Released on
...
Release type
...
Details
...
Rw ui tabs macro | |||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
<your_domain>.crt
This data collector can be run in any machine that has the Docker service available because it should be executed as a docker container. The following sections explain how to prepare all the required setup for having the data collector running. StructureThe following directory structure should be created for being used when running the collector: Code Block |
We use a piece of software called Collector Server to host and manage all our available collectors. To enable the collector for a customer:
Editing the JSON configuration
Devo credentialsIn Devo, go to Administration → Credentials → X.509 Certificates, download the Certificate, Private key and Chain CA and save them in
Editing the config.yaml file
Replace the placeholders with your required values following the description table below: | ||||||||||||||||
Parameter | Data type | Type | Value range | Details | |||||||||||||
|
|
|
| If the value is | |||||||||||||
|
|
| Minimum length: 1 | Use this param to give an unique id to this collector. | |||||||||||||
|
|
| Minimum length: 1 | Use this param to give a valid name to this collector. | |||||||||||||
|
|
|
| Use this param to identify the Devo Cloud where the events will be sent. | |||||||||||||
|
|
| Minimum length: 4 | Use this param to identify the chain.cert file downloaded from your Devo domain. Usually this file's name is: | |||||||||||||
|
|
| Minimum length: 4 | Use this param to identify the | |||||||||||||
|
|
| Minimum length: 4 | Use this param to identify the |
|
|
| Minimum length: 1 | |||||||||
Note |
[
[
{
"source": "record",
"key": "event_simpleName",
"type": "match",
"value": "EndOfProcess"
}
],
[
{
"source": "record",
"key": "event_simpleName",
"type": "match",
"value": "DeliverLocalFXToCloud"
}
]
]
}
}
}
}
} |
Info |
---|
All defined service entities will be executed by the collector. If you do not want to run any of them, just remove the entity from the |
Note |
---|
Please replace the placeholders with real world values following the description table below |
Use the following command to add the Docker image to the system:
Code Block |
---|
gunzip -c <image_file>-<version>.tgz | docker load |
Note |
---|
Once the Docker image is imported, it will show the real name of the Docker image (including version info). Replace |
The Docker image can be deployed on the following services:
Docker
Execute the following command on the root directory <any_directory>/devo-collectors/<product_name>/
Code Block |
---|
docker run
--name collector-<product_name>
--volume $PWD/certs:/devo-collector/certs
--volume $PWD/config:/devo-collector/config
--volume $PWD/state:/devo-collector/state
--env CONFIG_FILE=config.yaml
--rm
--interactive
--tty
<image_name>:<version> |
Note |
---|
Replace |
Docker Compose
The following Docker Compose file can be used to execute the Docker container. It must be created in the <any_directory>/devo-collectors/<product_name>/
directory.
Parameter | Data type | Type | Value range / Format | Details | ||
|
|
|
| If the value is | ||
|
|
| Minimum length: 1 | Use this param to give an unique id to this input service.
|
input_status
|
|
|
| Use this param to enable or disable the given input logic when running the collector. If the value is |
|
|
|
| By default, the base url is |
|
|
| Any | Only needed if not using cross account |
|
|
| Any | Only needed if not using cross account |
|
|
| Any | Only needed if using cross account This is devos cross account role |
|
|
| Any | Only needed if using cross account This is your cross account role |
|
|
| Any | Extra security you can set up |
|
|
|
| Needs to be set to true to delete messages from the queue. Leave false until testing complete |
|
|
|
| Set to False for most all scenarios. This parameter should be removed if it is not used. |
|
|
|
| Set to True to not send the log to Devo. This parameter should be removed if it is not used. |
|
|
bool
Min: 120
Max: 43200 (haven’t needed to test higher)
|
Optional
false
/ true
Set to True to will send the message md5 to my.app.sqs.message_body
only needed for more debugging on duplicates.
This parameter should be removed if it is not used.
sqs_visibility_timeout
int
Mandatory
| Min: 120 Max: 43200 (haven’t needed to test higher) | This parameter specifies how long the object will be held by the collector. If it is not processed and deleted within the allotted time in seconds. The message will be put back and can be processed again. Set this parameter for timeouts between the queue and the collector, the collector has to download large files and process them. |
Otherwise defaults to 120. For Crowdstrike FDR some messages can take 10-15 minutes to process please set the timeout to help duplicate reduction. | |||
|
|
| Min: 20 Max: 20 |
This is how long polling works. It will wait per poll the value of seconds listed. If no message is found, it will return Long poll did not find any messages in queue. All data in the SQS queue has been successfully collected. | ||||
|
|
| Min: 1 Max: 6 | This is now 1 always and forever. |
|
|
| Example:
| This is the region that is in the base url |
|
|
| This needs to be true or False | Only works with GZIP compression should be false unless you see this below. If you see any errors ‘ |
Download the Docker image
The collector should be deployed as a Docker container. Download the Docker image of the collector as a .tgz file by clicking the link in the following table:
Collector Docker image
SHA-256 hash
c653b5e9390abd4bdac59a1f3ccbf3b057ffddf7f8bdbe2ca5eea3bd7d727598
|
|
|
| This parameter means the way the log files are encoded inside the s3 bucket. Options from most used to least used.
|
Rw tab | ||
---|---|---|
|
This data collector can be run in any machine that has the Docker service available because it should be executed as a docker container. The following sections explain how to prepare all the required setup for having the data collector running.
Structure
The following directory structure should be created for being used when running the collector:
Code Block |
---|
<any_directory> └── devo-collectors/ └── <product_name>/ ├── certs/ │ ├── chain.crt │ ├── <your_domain>.key │ └── <your_domain>.crt ├── state/ └── config/ - ./certs:/devo-collector/certs - ./config:/devo-collector/config - ./credentials:/devo-collector/credentials - ./state:/devo-collector/state environment: - CONFIG_FILE=${CONFIG_FILE:-config.yaml} |
To run the container using docker-compose, execute the following command from the <any_directory>/devo-collectors/<product_name>/
directory:
Code Block |
---|
IMAGE_VERSION=<version> docker-compose up -d |
Note |
---|
Replace |
Rw tab | ||
---|---|---|
|
We use a piece of software called Collector Server to host and manage all our available collectors.
To enable the collector for a customer:
In the Collector Server GUI, access the domain in which you want this instance to be created
Click Add Collector and find the one you wish to add.
In the Version field, select the latest value.
In the Collector Name field, set the value you prefer (this name must be unique inside the same Collector Server domain).
In the sending method select Direct Send. Direct Send configuration is optional for collectors that create
Table
events, but mandatory for those that createLookups
.In the Parameters section, establish the Collector Parameters as follows below:
Editing the JSON configuration
Code Block |
---|
{ "global_overrides": { "debug": false }, "inputs": { "sqs_collector": { "id": "12351", "enabled": true, "credentials": { "aws_access_key_id": "", "aws_secret_access_key": "", "aws_base_account_role": " └── config.yaml |
Note |
---|
Replace |
Devo credentials
In Devo, go to Administration → Credentials → X.509 Certificates, download the Certificate, Private key and Chain CA and save them in <product_name>/certs/
. Learn more about security credentials in Devo here.
Note |
---|
Replace |
Editing the config.yaml file
Code Block |
---|
globals: debug: <debug_status> id: <collector_id> name: <collector_name> persistence: type: filesystem config: directory_name: state multiprocessing: false queue_max_size_in_mb: 1024 queue_max_size_in_messages: 1000 queue_max_elapsed_time_in_sec: 60 queue_wrap_max_size_in_messages: 100 outputs: devo_1: type: devo_platform config: address: <devo_address> port: 443 type: SSL chain: <chain_filename> cert: <cert_filename> key: <key_filename> inputs: sqs: id: 12345 enabled: true credentials: aws_access_key_id: password aws_secret_access_key: secret-access-key aws_base_account_role: arn:aws:iam::837131528613:role/devo-xaccount-cs-role", "aws_cross_account_role": "", : arn:aws:iam::{account-id}:role/{role-name} "aws_external_id": ""extra_security_optional region: region }, "ack_messages": false, base_url: https://sqs.{region}.amazonaws.com/{account-number}/{queue-name} sqs_visibility_timeout: 120 sqs_wait_timeout: 20 sqs_max_messages: 4 ack_messages: false "direct_mode": false, "do_not_send": false, "compressed_events": false, "debug_md5"services: true, "base_url": "https://us-west-1.queue.amazonaws.com/id/name-of-queue",custom_service: "region": "us-west-1",file_field_definitions: {} "sqsfilename_visibilityfilter_timeout"rules: [] 240, "sqs_wait_timeout": 20,encoding: gzip "sqs_maxack_messages": 1, false "services": { "custom_service"file_format: { "file_field_definitions": {},type: single_json_object_processor "filename_filter_rules": [],config: "encoding"key: "gzip",Records "send_filtered_out_to_unknown": false,record_field_mapping: {} "file_format": { routing_template: my.app.source1.type1 "type": "line_split_processor", "config": { "json": true } }, "record_field_mapping": { "event_simpleName": { "keys": [ "event_simpleName" ] } }, "routing_template": "edr.crowdstrike.cannon", "line_filter_rules": [ [ { "source": "record", "key": "event_simpleName", "type": "match", "value": "EndOfProcess" } ], [ { "source": "record", "key": "event_simpleName", "type": "match", "value": "DeliverLocalFXToCloud" } ] ] } } } } } |
Info |
---|
All defined service entities will be executed by the collector. If you do not want to run any of them, just remove the entity from the |
Note |
---|
Please replace the placeholders with real world values following the description table below |
Parameter
Data type
Type
Value range / Format
Details
debug_status
bool
Mandatory
false
/ true
If the value is true
, the debug logging traces will be enabled when running the collector. If the value is false
, only the info
, warning
and error
logging levels will be printed.
short_unique_id
int
Mandatory
Minimum length: 1
Maximum length: 5
Use this param to give an unique id to this input service.
Note |
---|
This parameter is used to build the persistence address, do not use the same value for multiple collectors. It could cause a collision. |
enabled
bool
Mandatory
false
/ true
Use this param to enable or disable the given input logic when running the collector. If the value is true
, the input will be run. If the value is false
, it will be ignored.
base_url
str
Mandatory
By default, the base url is https://sqs.region.amazonaws.com/account-number/queue-name
. This needs to be set to the url of sqs.
aws_access_key_id
str
Mandatory/Optional
Any
Only needed if not using cross account
aws_secret_access_key
str
Mandatory/Optional
Any
Only needed if not using cross account
aws_base_account_role
str
Mandatory/Optional
Any
Only needed if using cross account This is devos cross account role
aws_cross_account_role
str
Mandatory/Optional
Any
Only needed if using cross account This is your cross account role
aws_external_id
str
Optional
Any
Extra security you can set up
ack_messages
bool
Manatory
false
/ true
Needs to be set to true to delete messages from the queue. Leave false until testing complete
direct_mode
bool
Optional
false
/ true
Set to False for most all scenarios.
This parameter should be removed if it is not used.
do_not_send
bool
Optional
false
/ true
Set to True to not send the log to Devo.
This parameter should be removed if it is not used.
debug_md5
bool
Optional
false
/ true
Set to True to will send the message md5 to my.app.sqs.message_body
only needed for more debugging on duplicates.
This parameter should be removed if it is not used.
sqs_visibility_timeout
int
Mandatory
Min: 120
Max: 43200 (haven’t needed to test higher)
Set this parameter for timeouts between the queue and the collector, the collector has to download large files and process them. If this process is broken up the time. Otherwise defaults to 120.
sqs_wait_timeout
int
Mandatory
Min: 20
Max: 20
The min has handled most customer scenarios at this point.
sqs_max_messages
int
Mandatory
Min: 1
Max: 6
This is now 1 always and forever.
region
str
Mandatory
Example:
us-east-1
This is the region that is in the base url
compressed_events
bool
Mandatory
This needs to be true or False
Only works with GZIP compression should be false unless you see this below.
If you see any errors ‘utf-8' codec can't decode byte 0xa9 in position 36561456: invalid start byte
it might be the events need to be decompressed
Verify data collection
Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.
This service has the following components:
...
Component
...
Description
...
Setup
...
The setup module is in charge of authenticating the service and managing the token expiration when needed.
...
Puller
...
The setup module is in charge of pulling the data in a organized way and delivering the events via SDK.
Setup output
A successful run has the following output messages for the setup module:
Code Block |
---|
2024-01-16T12:47:04.044 INFO OutputProcess::MainThread -> Process started
2024-01-16T12:47:04.044 INFO InputProcess::MainThread -> Process Started
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> InputThread(sqs_collector,12345) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> ServiceThread(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPullerSetup(unknown,sqs_collector#12345,aws_sqs_vpc#predefined) -> Starting thread
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread
2024-01-16T12:47:04.178 WARNING InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Waiting until setup will be executed
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSender(standard_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(standard_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManager(standard_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSender(lookup_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::ConsoleSenderManager(standard_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(standard_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::OutputStandardConsumer(standard_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputStandardConsumer(standard_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(lookup_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManager(lookup_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSender(internal_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::ConsoleSenderManager(lookup_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(lookup_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(internal_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManager(internal_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::OutputLookupConsumer(lookup_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputLookupConsumer(lookup_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:05.795 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Starting data collection every 5 seconds |
Puller output
A successful initial run has the following output messages for the puller module:
Note that the PrePull
action is executed only one time before the first run of the Pull
action.
Code Block |
---|
I2024-01-16T17:02:56.221036303Z 2024-01-16T17:02:56.220 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Acked message receiptHandle: /+qA+ymL2Vs8yb//++7YM2Ef8BCetrJ+/+////F1uwLOVfONfagI99vA=
2024-01-16T17:02:56.221386926Z 2024-01-16T17:02:56.221 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Data collection completed. Elapsed time: 2.413 seconds. Waiting for 2.587 second(s) until the next one |
Restart the persistence
This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:
Delete and Re-DO the collector with new ID number
The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.
Note |
---|
Note that this action clears the persistence and cannot be recovered in any way. Resetting persistence could result in duplicate or lost events. |
Collector operations
This section is intended to explain how to proceed with specific operations of this collector.
Verify collector operations
The initialization module is in charge of setup and running the input (pulling logic) and output (delivering logic) services and validating the given configuration.
Events delivery and Devo ingestion
The event delivery module is in charge of receiving the events from the internal queues where all events are injected by the pullers and delivering them using the selected compatible delivery method.
A successful run has the following output messages for the initializer module:
Code Block |
---|
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {}
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Sender: SyslogSender(standard_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True}
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Standard - Total number of messages sent: 44, messages sent since "2022-06-28 10:39:22.511671+00:00": 44 (elapsed 0.007 seconds)
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {}
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Sender: SyslogSender(internal_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True}
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Internal - Total number of messages sent: 1, messages sent since "2022-06-28 10:39:22.516313+00:00": 1 (elapsed 0.019 seconds) |
Sender services
The Integrations Factory Collector SDK has 3 different senders services depending on the event type to delivery (internal
, standard
, and lookup
). This collector uses the following Sender Services:
...
Sender Services
...
Description
...
internal_senders
...
In charge of delivering internal metrics to Devo such as logging traces or metrics.
...
standard_senders
...
In charge of delivering pulled events to Devo.
Sender statistics
Each service displays its own performance statistics that allow checking how many events have been delivered to Devo by type:
...
Logging trace
...
Description
...
Number of available senders: 1
...
Displays the number of concurrent senders available for the given Sender Service.
...
sender manager internal queue size: 0
...
Displays the items available in the internal sender queue.
This value helps detect bottlenecks and needs to increase the performance of data delivery to Devo. This last can be made by increasing the concurrent senders.
...
Total number of messages sent: 44, messages sent since "2022-06-28 10:39:22.511671+00:00": 21 (elapsed 0.007 seconds)
...
Displayes the number of events from the last time and following the given example, the following conclusions can be obtained:
44 events were sent to Devo since the collector started.
The last checkpoint timestamp was
2022-06-28 10:39:22.511671+00:00
.21 events where sent to Devo between the last UTC checkpoint and now.
Those 21 events required
0.007 seconds
to be delivered.
By default these traces will be shown every 10 minutes.
Replace the placeholders with your required values following the description table below:
Download the Docker imageThe collector should be deployed as a Docker container. Download the Docker image of the collector as a .tgz file by clicking the link in the following table:
Use the following command to add the Docker image to the system:
The Docker image can be deployed on the following services: DockerExecute the following command on the root directory
Docker ComposeThe following Docker Compose file can be used to execute the Docker container. It must be created in the
To run the container using docker-compose, execute the following command from the
|
Verify data collection
Once the collector has been launched, it is important to check if the ingestion is performed in a proper way. To do so, go to the collector’s logs console.
This service has the following components:
Component | Description |
---|---|
Setup | The setup module is in charge of authenticating the service and managing the token expiration when needed. |
Puller | The setup module is in charge of pulling the data in a organized way and delivering the events via SDK. |
Setup output
A successful run has the following output messages for the setup module:
Code Block |
---|
2024-01-16T12:47:04.044 INFO OutputProcess::MainThread -> Process started
2024-01-16T12:47:04.044 INFO InputProcess::MainThread -> Process Started
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> InputThread(sqs_collector,12345) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> ServiceThread(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread (execution_period=60s)
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPullerSetup(unknown,sqs_collector#12345,aws_sqs_vpc#predefined) -> Starting thread
2024-01-16T12:47:04.177 INFO InputProcess::MainThread -> AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) - Starting thread
2024-01-16T12:47:04.178 WARNING InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Waiting until setup will be executed
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSender(standard_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(standard_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.191 INFO OutputProcess::MainThread -> ConsoleSenderManager(standard_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSender(lookup_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.192 INFO OutputProcess::ConsoleSenderManager(standard_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(standard_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::OutputStandardConsumer(standard_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputStandardConsumer(standard_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(lookup_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.192 INFO OutputProcess::MainThread -> ConsoleSenderManager(lookup_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSender(internal_senders,console_sender_0) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::ConsoleSenderManager(lookup_senders,manager,console_1) -> [EMERGENCY PERSISTENCE SYSTEM] ConsoleSenderManager(lookup_senders,manager,console_1) -> Nothing retrieved from the persistence.
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManagerMonitor(internal_senders,console_1) -> Starting thread (every 300 seconds)
2024-01-16T12:47:04.193 INFO OutputProcess::MainThread -> ConsoleSenderManager(internal_senders,manager,console_1) -> Starting thread
2024-01-16T12:47:04.193 INFO OutputProcess::OutputLookupConsumer(lookup_senders_consumer_0) -> [EMERGENCY PERSISTENCE SYSTEM] OutputLookupConsumer(lookup_senders_consumer_0) -> Nothing retrieved from the persistence.
2024-01-16T12:47:05.795 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_vpc,predefined) -> Starting data collection every 5 seconds |
Puller output
A successful initial run has the following output messages for the puller module:
Note that the PrePull
action is executed only one time before the first run of the Pull
action.
Code Block |
---|
I2024-01-16T17:02:56.221036303Z 2024-01-16T17:02:56.220 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Acked message receiptHandle: /+qA+ymL2Vs8yb//++7YM2Ef8BCetrJ+/+////F1uwLOVfONfagI99vA=
2024-01-16T17:02:56.221386926Z 2024-01-16T17:02:56.221 INFO InputProcess::AWSsqsPuller(sqs_collector,12345,aws_sqs_cloudwatch_vpc,predefined) -> Data collection completed. Elapsed time: 2.413 seconds. Waiting for 2.587 second(s) until the next one |
Restart the persistence
This collector uses persistent storage to download events in an orderly fashion and avoid duplicates. In case you want to re-ingest historical data or recreate the persistence, you can restart the persistence of this collector by following these steps:
Delete and Re-DO the collector with new ID number
The collector will detect this change and will restart the persistence using the parameters of the configuration file or the default configuration in case it has not been provided.
Note |
---|
Note that this action clears the persistence and cannot be recovered in any way. Resetting persistence could result in duplicate or lost events. |
Collector operations
This section is intended to explain how to proceed with specific operations of this collector.
Verify collector operations
The initialization module is in charge of setup and running the input (pulling logic) and output (delivering logic) services and validating the given configuration.
Events delivery and Devo ingestion
The event delivery module is in charge of receiving the events from the internal queues where all events are injected by the pullers and delivering them using the selected compatible delivery method.
A successful run has the following output messages for the initializer module:
Code Block |
---|
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {}
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Sender: SyslogSender(standard_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True}
INFO OutputProcess::SyslogSenderManagerMonitor(standard_senders,sidecar_0) -> Standard - Total number of messages sent: 44, messages sent since "2022-06-28 10:39:22.511671+00:00": 44 (elapsed 0.007 seconds)
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Number of available senders: 1, sender manager internal queue size: 0
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> enqueued_elapsed_times_in_seconds_stats: {}
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Sender: SyslogSender(internal_senders,syslog_sender_0), status: {"internal_queue_size": 0, "is_connection_open": True}
INFO OutputProcess::SyslogSenderManagerMonitor(internal_senders,sidecar_0) -> Internal - Total number of messages sent: 1, messages sent since "2022-06-28 10:39:22.516313+00:00": 1 (elapsed 0.019 seconds) |
Sender services
The Integrations Factory Collector SDK has 3 different senders services depending on the event type to delivery (internal
, standard
, and lookup
). This collector uses the following Sender Services:
Sender Services | Description |
---|---|
| In charge of delivering internal metrics to Devo such as logging traces or metrics. |
| In charge of delivering pulled events to Devo. |
Sender statistics
Each service displays its own performance statistics that allow checking how many events have been delivered to Devo by type:
Logging trace | Description |
---|---|
| Displays the number of concurrent senders available for the given Sender Service. |
| Displays the items available in the internal sender queue. This value helps detect bottlenecks and needs to increase the performance of data delivery to Devo. This last can be made by increasing the concurrent senders. |
| Displayes the number of events from the last time and following the given example, the following conclusions can be obtained:
By default these traces will be shown every 10 minutes. |
Check memory usage
To check the memory usage of this collector, look for the following log records in the collector which are displayed every 5 minutes by default, always after running the memory free process.
The used memory is displayed by running processes and the sum of both values will give the total used memory for the collector.
The global pressure of the available memory is displayed in the
global
value.All metrics (Global, RSS, VMS) include the value before freeing and after:
previous -> after freeing memory
Code Block |
---|
INFO InputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(34.50MiB -> 34.08MiB), VMS(410.52MiB -> 410.02MiB)
INFO OutputProcess::MainThread -> [GC] global: 20.4% -> 20.4%, process: RSS(28.41MiB -> 28.41MiB), VMS(705.28MiB -> 705.28MiB) |
Differences between RSS
and VMS
memory usage:
RSS
is the Resident Set Size, which is the actual physical memory the process is usingVMS
is the Virtual Memory Size which is the virtual memory that process is using
Enable/disable the logging debug mode
Sometimes it is necessary to activate the debug mode of the collector's logging. This debug mode increases the verbosity of the log and allows you to print execution traces that are very helpful in resolving incidents or detecting bottlenecks in heavy download processes.
To enable this option you just need to edit the configuration file and change the debug_status parameter from false to true and restart the collector.
To disable this option, you just need to update the configuration file and change the debug_status parameter from true to false and restart the collector.
For more information, visit the configuration and parameterization section corresponding to the chosen deployment mode.
Change log
Release | Released on | Release type | Details | Recommendations | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
| Bug Fixes
Features
|
| |||||||||||||||||||
|
| Features
Bug Fixes
|
| |||||||||||||||||||
|
| Bug Fixes
|
| |||||||||||||||||||
|
| Bug Fixes
|
| |||||||||||||||||||
|
| Improvements
|
| |||||||||||||||||||
|
| Improvements
Bug fixes
|
| |||||||||||||||||||
|
| Bug fixes Fixed dependency issue |
| |||||||||||||||||||
|
| Feature
Improvements
|
| |||||||||||||||||||
|
| Features
Bug fixes
Improvements
|
| |||||||||||||||||||
|
| Bug fixing
|
| |||||||||||||||||||
|
| Bug fixing
|
| |||||||||||||||||||
|
| Features
|
| |||||||||||||||||||
|
| Features
|
| |||||||||||||||||||
|
| Features
|
| |||||||||||||||||||
|
| Features
| recommended
| |||||||||||||||||||
|
|
| upgrade
| |||||||||||||||||||
|
| Bug fixes
Improvements
Features
| upgrade
| |||||||||||||||||||
|
| Bug fixes
Improvements
| upgrade
| |||||||||||||||||||
|
| Released with DCSDK 1.10.2 | initial
|