/
Join

Join

Description

This unit is a Processor type.

This unit joins or merges two input streams that coincide within a configured timeframe, and that meet a specified condition. These conditions are predicates that can only evaluate to "true" or "false". 

Timestamp values coming from input events are used to determine the event time of the Join unit, instead of using system time. If the unit stops receiving events, and therefore cannot determine the time from the Timestamp values, the unit receives its time signals (called stall signals) via the in1Stall/In2Stall ports coming from Devo type units that provide the system time.

An event enters through one of the input ports. The event is held for the duration of the timeframe (window) configured in the size field. This timeframe is used as a sliding window to determine how often to capture and analyze data. See the table below for examples.

The event is joined with events stored in the window of the other in port. If the join condition is "true", the joined event is sent to the out output port. If the event is late, i.e. does not fall within the timeframe, it is sent to the discarded output port. If the evaluation produces an error, the event is sent to the error port.

Configuration

After dragging this unit into the Flow canvas, double-click it to access its configuration options. The following table describes the configuration options of this unit:

Tab

Field

Description

Tab

Field

Description

General

Name

Enter a name for the unit. It must start with a letter, and cannot contain spaces. Only letters, numbers, and underscores are allowed.

Description

Enter a description detailing the scope of the unit.

Language

Specify the language you will use to write the expression in the Predicate, e.g. Javascript, Groovy, etc.

Predicate

The condition you wish to evaluate. Open the expression editor to type an expression, stating the input fields and the condition you wish to apply to each.

Use 'in1.' and 'in2.' to indicate access fields of a specific input event.

In1 Timestamp field

The name of an event field that contains the timestamp used for the sliding windows, e.g. eventdate, for the first input stream.

In1 Key field(s)

The name of the first set of input event field(s) used for the join.

In2 Timestamp field

The name of an event field that contains the timestamp used for the sliding windows, e.g. eventdate, for the second input stream.

In2 Key field(s)

The name of the second set of input event field(s) used for the join.

Size

Size of sliding window in seconds. For example, imagine you add a size of 300 seconds (5 minutes) with a timestamp at 11:00. The window will be 11:00 - 11:05. In order to determine the next window, the values set in the Purge size field are used.

Purge size

Time at which the following sliding windows will start. For example, if the purge size is 60 seconds, the next time window for the input events will be 11:01-11:06, and so on.

Input ports

Port

Description

Port

Description

in1

Input port for the first input stream.

in1Stall

Input port for the first stall signal (time) stream.

in2

Input port for the second input stream.

in2Stall

Input port for the second stall signal (time) stream.

Output ports

Port

Description

Port

Description

out

This port outputs events for which the condition is evaluated as "true".

discarded

This port outputs events that are late, i.e. do not fall within the timeframe.

error

This port outputs events that generate an error when evaluated against the condition. Standard error fields (error, exception) are added to the output events.

Example

In this example, we want to join events coming from different input streams when a specified value coincides, in this case, when users from both streams log in from the same city. We wish to send the results by email.

First, we must create two input streams to be joined together when the city predicate is met in the Join unit.

We use two Devo Source units with the following query:

from siem.logtrust.web.activity select *

with eventdate as the time column value.

Next, we will filter for user values to emit null values using two Filter units. In the Predicate field of the properties, enter the query:

username != null

Link the data output port of each Devo Source unit to the in port of the corresponding Filter unit.

Now that we have two data streams, we can add the Join unit to combine them when the cities coincide.

In the properties, enter the predicate:

in1.city == in2.city

It is important to specify the timestamp field for both input streams to configure the sliding window for when each stream must start and stop providing data.

Link the out port of both Filter units to the In1 and In2 ports of the Join.

Finally, add an Email Sink unit, connected to the out port of the Join unit, and add To recipients to receive the results by email.

Download this example

You can try this flow by downloading the following JSON file and uploading it to your domain using the Import option: