Join
Description
This unit is a Processor type.
This unit joins or merges two input streams that coincide within a configured timeframe, and that meet a specified condition. These conditions are predicates that can only evaluate to "true" or "false".
Timestamp values coming from input events are used to determine the event time of the Join unit, instead of using system time. If the unit stops receiving events, and therefore cannot determine the time from the Timestamp values, the unit receives its time signals (called stall signals) via the in1Stall/In2Stall ports coming from Devo type units that provide the system time.
An event enters through one of the input ports. The event is held for the duration of the timeframe (window) configured in the size field. This timeframe is used as a sliding window to determine how often to capture and analyze data. See the table below for examples.
The event is joined with events stored in the window of the other in port. If the join condition is "true", the joined event is sent to the out output port. If the event is late, i.e. does not fall within the timeframe, it is sent to the discarded output port. If the evaluation produces an error, the event is sent to the error port.
Configuration
After dragging this unit into the Flow canvas, double-click it to access its configuration options. The following table describes the configuration options of this unit:
Tab | Field | Description |
---|---|---|
General | Name | Enter a name for the unit. It must start with a letter, and cannot contain spaces. Only letters, numbers, and underscores are allowed. |
Description | Enter a description detailing the scope of the unit. | |
Language | Specify the language you will use to write the expression in the Predicate, e.g. Javascript, Groovy, etc. | |
Predicate | The condition you wish to evaluate. Open the expression editor to type an expression, stating the input fields and the condition you wish to apply to each. Use 'in1.' and 'in2.' to indicate access fields of a specific input event. | |
In1 Timestamp field | The name of an event field that contains the timestamp used for the sliding windows, e.g. eventdate, for the first input stream. | |
In1 Key field(s) | The name of the first set of input event field(s) used for the join. | |
In2 Timestamp field | The name of an event field that contains the timestamp used for the sliding windows, e.g. eventdate, for the second input stream. | |
In2 Key field(s) | The name of the second set of input event field(s) used for the join. | |
Size | Size of sliding window in seconds. For example, imagine you add a size of 300 seconds (5 minutes) with a timestamp at 11:00. The window will be 11:00 - 11:05. In order to determine the next window, the values set in the Purge size field are used. | |
Purge size | Time at which the following sliding windows will start. For example, if the purge size is 60 seconds, the next time window for the input events will be 11:01-11:06, and so on. |
Input ports
Port | Description |
---|---|
in1 | Input port for the first input stream. |
in1Stall | Input port for the first stall signal (time) stream. |
in2 | Input port for the second input stream. |
in2Stall | Input port for the second stall signal (time) stream. |
Output ports
Port | Description |
---|---|
out | This port outputs events for which the condition is evaluated as "true". |
discarded | This port outputs events that are late, i.e. do not fall within the timeframe. |
error | This port outputs events that generate an error when evaluated against the condition. Standard error fields (error, exception) are added to the output events. |
Example
In this example, we want to join events coming from different input streams when a specified value coincides, in this case, when users from both streams log in from the same city. We wish to send the results by email.
First, we must create two input streams to be joined together when the city predicate is met in the Join unit.
We use two Devo Source units with the following query:
from siem.logtrust.web.activity
select *
with eventdate as the time column value.
Next, we will filter for user values to emit null values using two Filter units. In the Predicate field of the properties, enter the query:
username != null
Link the data output port of each Devo Source unit to the in port of the corresponding Filter unit.
Now that we have two data streams, we can add the Join unit to combine them when the cities coincide.
In the properties, enter the predicate:
in1.city == in2.city
It is important to specify the timestamp field for both input streams to configure the sliding window for when each stream must start and stop providing data.
Link the out port of both Filter units to the In1 and In2 ports of the Join.
Finally, add an Email Sink unit, connected to the out port of the Join unit, and add To recipients to receive the results by email.
Download this example
You can try this flow by downloading the following JSON file and uploading it to your domain using the Import option: