Forward response to Kafka
Configuration
Using the destination
object in a query request, you can have query responses forwarded to your organization's data storage services, including Apache Kafka systems.
Kafka is used with transactional configuration and events are committed based on two parameters:
- size of events sent
- time since the last commit
In case of error, the task will be restarted from the last committed point. Therefore, to read events from Kafka you should use a transactional consumer if you want to avoid reading the same event twice. This would happen if the task is restarted and non-committed events are resent.
To forward a query's response to Kafka, include the destination
object in the query request and set type
to kafka
like this:
"destination": { "type":"kafka" }
Parameters
With the destination.type
set to kafka
, you need to specify some additional parameters:
Parameter | Type | Required | Description |
---|---|---|---|
| string | Yes | Name of the Kafka topic to which you want to save the data contained in the query response. |
| string | Yes | Comma-separated list of host/port pairs to be used as addresses of the Kafka brokers. The format is |
| string | Optional | Maximum bytes per commit. The default value is 1MB and the maximum value is 10MB. |
| string | Optional | Maximum time between commits, in milliseconds. The default value is 10 seconds and the minimum time is 1 second. |
| string | Optional | Indicate |
JSON mode
If you specify json
in the mode
parameter, a JSON per event will be sent to Kafka, including the fields as keys.
Request example
{ "query": "from siem.logtrust.malote.query select user, level, clusterid", "from": "1d", "to": "now", "destination": { "type": "kafka", "params": { "topic": "some.topic", "bootstrap.servers": "localhost:9092", "maxBytesPerCommit": 5242880, "maxTimeWithoutCommit": 5000, "mode": "json" } } }
Results in Kafka
{"user":"paul@devo.com", "level":"notice", "clusterid": 1} {"user":"john@devo.com", "level":"notice", "clusterid": 3} {"user":"jorge@devo.com", "level":"notice", "clusterid": 2}
Raw mode
If you specify raw
in the mode
parameter, the API will only receive a single field (or an expression with an alias), which will be directly sent to Kafka.
Request example 1
In this example, we are sending the events as a JSON:
select json({\"user\": user, \"level\": level, \"culsterid\": clusterid}) as myalias
{ "query": "from siem.logtrust.malote.query select json({\"user\": user, \"level\": level, \"culsterid\": clusterid}) as event", "from": "1d", "to": "now", "destination": { "type": "kafka", "params": { "topic": "some.topic", "bootstrap.servers": "localhost:9092", "mode": "raw" } } }
Results in Kafka
{"user":"paul@devo.com", "level":"notice", "clusterid": "1"} {"user":"john@devo.com", "level":"notice", "clusterid": "3"} {"user":"jorge@devo.com", "level":"notice", "clusterid": "2"}
Request example 2
In this example, we are sending a single field.
{ "query": "from siem.logtrust.malote.query select user", "from": "1d", "to": "now", "destination": { "type": "kafka", "params": { "topic": "some.topic", "bootstrap.servers": "localhost:9092", "mode": "raw" } } }
Results in Kafka
paul@devo.com john@devo.com jorge@devo.com
Errors
Error | Message |
---|---|
250 | Kafka params should include topic and bootstrap.servers |
251 | Max size per commit cannot exceed 10MB |
252 | Max time without commit cannot be less than 1 second |
253 | Kafka mode should be raw or json |
For more information about Kafka, visit the official Kafka website.