Document toolboxDocument toolbox

Forward response to Kafka

Configuration

Using the destination object in a query request, you can have query responses forwarded to your organization's data storage services, including Apache Kafka systems. 

Kafka is used with transactional configuration and events are committed based on two parameters:

  • size of events sent
  • time since the last commit

In case of error, the task will be restarted from the last committed point. Therefore, to read events from Kafka you should use a transactional consumer if you want to avoid reading the same event twice. This would happen if the task is restarted and non-committed events are resent.

To forward a query's response to Kafka, include the destination object in the query request and set type to kafka like this:

"destination": {
 "type":"kafka"
}

Parameters

With the destination.type set to kafka, you need to specify some additional parameters:

ParameterTypeRequiredDescription

topic

string

Yes

Name of the Kafka topic to which you want to save the data contained in the query response.

bootstrap.servers

string

Yes

Comma-separated list of host/port pairs to be used as addresses of the Kafka brokers. The format is hostname:port.

maxBytesPerCommit

string

Optional

Maximum bytes per commit. The default value is 1MB and the maximum value is 10MB.

maxTimeWithoutCommit

string

Optional

Maximum time between commits, in milliseconds. The default value is 10 seconds and the minimum time is 1 second.

mode

string

Optional

Indicate json or raw. See examples below.

JSON mode

If you specify json in the mode parameter, a JSON per event will be sent to Kafka, including the fields as keys.

Request example

{
  "query": "from siem.logtrust.malote.query select user, level, clusterid",
  "from": "1d",
  "to": "now",
  "destination": {
    "type": "kafka",
    "params": {
      "topic": "some.topic",
      "bootstrap.servers": "localhost:9092",
      "maxBytesPerCommit": 5242880,
      "maxTimeWithoutCommit": 5000,
      "mode": "json"
    }
  }
}

Results in Kafka

{"user":"paul@devo.com", "level":"notice", "clusterid": 1}
{"user":"john@devo.com", "level":"notice", "clusterid": 3}
{"user":"jorge@devo.com", "level":"notice", "clusterid": 2}

Raw mode

If you specify raw in the mode parameter, the API will only receive a single field (or an expression with an alias), which will be directly sent to Kafka.

Request example 1

In this example, we are sending the events as a JSON:

select json({\"user\": user, \"level\": level, \"culsterid\": clusterid}) as myalias

{
  "query": "from siem.logtrust.malote.query select json({\"user\": user, \"level\": level, \"culsterid\": clusterid}) as event",
  "from": "1d",
  "to": "now",
  "destination": {
    "type": "kafka",
    "params": {
      "topic": "some.topic",
      "bootstrap.servers": "localhost:9092",
      "mode": "raw"
    }
  }
}

Results in Kafka

{"user":"paul@devo.com", "level":"notice", "clusterid": "1"}
{"user":"john@devo.com", "level":"notice", "clusterid": "3"}
{"user":"jorge@devo.com", "level":"notice", "clusterid": "2"}

Request example 2

In this example, we are sending a single field.

{
  "query": "from siem.logtrust.malote.query select user",
  "from": "1d",
  "to": "now",
  "destination": {
    "type": "kafka",
    "params": {
      "topic": "some.topic",
      "bootstrap.servers": "localhost:9092",
      "mode": "raw"
    }
  }
}

Results in Kafka

paul@devo.com
john@devo.com
jorge@devo.com

Errors

ErrorMessage
250Kafka params should include topic and bootstrap.servers
251Max size per commit cannot exceed 10MB
252Max time without commit cannot be less than 1 second
253Kafka mode should be raw or json

For more information about Kafka, visit the official Kafka website.