Document toolboxDocument toolbox

Forward response to Kafka

[ 1 Configuration ] [ 2 JSON mode ] [ 3 Raw mode ] [ 4 Errors ]

Configuration

Using the destination object in a query request, you can have query responses forwarded to your organization's data storage services, including Apache Kafka systems. 

Kafka is used with transactional configuration and events are committed based on two parameters:

  • size of events sent

  • time since the last commit

In case of error, the task will be restarted from the last committed point. Therefore, to read events from Kafka you should use a transactional consumer if you want to avoid reading the same event twice. This would happen if the task is restarted and non-committed events are resent.

To forward a query's response to Kafka, include the destination object in the query request and set type to kafka like this:

"destination": { "type":"kafka" }

Parameters

With the destination.type set to kafka, you need to specify some additional parameters:

Parameter

Type

Required

Description

Parameter

Type

Required

Description

topic

string

Yes

Name of the Kafka topic to which you want to save the data contained in the query response.

bootstrap.servers

string

Yes

Comma-separated list of host/port pairs to be used as addresses of the Kafka brokers. The format is hostname:port.

maxBytesPerCommit

string

Optional

Maximum bytes per commit. The default value is 1MB and the maximum value is 10MB.

maxTimeWithoutCommit

string

Optional

Maximum time between commits, in milliseconds. The default value is 10 seconds and the minimum time is 1 second.

mode

string

Optional

Indicate json or raw. See examples below.

JSON mode

If you specify json in the mode parameter, a JSON per event will be sent to Kafka, including the fields as keys.

Request example

{ "query": "from siem.logtrust.malote.query select user, level, clusterid", "from": "1d", "to": "now", "destination": { "type": "kafka", "params": { "topic": "some.topic", "bootstrap.servers": "localhost:9092", "maxBytesPerCommit": 5242880, "maxTimeWithoutCommit": 5000, "mode": "json" } } }

Results in Kafka

{"user":"paul@devo.com", "level":"notice", "clusterid": 1} {"user":"john@devo.com", "level":"notice", "clusterid": 3} {"user":"jorge@devo.com", "level":"notice", "clusterid": 2}

Raw mode

If you specify raw in the mode parameter, the API will only receive a single field (or an expression with an alias), which will be directly sent to Kafka.

Request example 1

In this example, we are sending the events as a JSON:

select json({\"user\": user, \"level\": level, \"culsterid\": clusterid}) as myalias

Results in Kafka

Request example 2

In this example, we are sending a single field.

Results in Kafka

Errors

Error

Message

Error

Message

250

Kafka params should include topic and bootstrap.servers

251

Max size per commit cannot exceed 10MB

252

Max time without commit cannot be less than 1 second

253

Kafka mode should be raw or json

More about Kafka

For more information about Kafka, visit the official Kafka website.