Time Window Collector
Description
The unit is a Processor type unit.
The Time Window Collector unit keeps events in memory until a certain time condition is met. It can be configured to either use a global window for all events or to create a new window for each key.
An event comes in through the in port. The event is stored in the window associated with the event key. If the time condition of the window is met, all events are released.
Windowed events are sent to the out output port.
Events that do not match the time condition are sent to the discarded port.
If an error occurs, the input events are enriched with standard error fields and sent to the error port.
Configuration
After dragging this unit into the Flow canvas, double-click it to access its configuration options. The following table describes the configuration options of this unit:
Tab | Field | Description |
---|---|---|
General | Name | Enter a name for the unit. It must start with a letter, and cannot contain spaces. Only letters, numbers, and underscores are allowed. |
Description | Enter a description detailing the scope of the unit. | |
Key field(s) | Enter the name of an input event field(s) containing key(s). | |
Size | The size of the window in seconds. Must be greater than 0. If this number is smaller than the "Purge size", a sliding window is used. For example, imagine you add a size of 300 seconds (5 minutes) with a timestamp at 11:00. The sliding window will be 11:00 - 11:05. In order to determine the next window, the values set in the Purge size field are used. If this number is equal to "Purge size", a tumbling window is used. | |
Purge size | Amount of seconds after which to purge from the window when it is considered complete. All events received at the purged seconds are removed from the window. If this number is larger than "size", a sliding window is used. If this number is equal to "size", a tumbling window is used. | |
Timestamp field | The name of an event field that contains the timestamp. If empty, the unit will use the System time to calculate the event timestamp. Changing this field in a running flow diagram produces a reset of the unit. You can also write java class qualified name. | |
Output key(s) field | Enter a name for the output event field containing key(s). | |
Window field | Enter a name for the output event field containing the window list. | |
Size field | Enter a name for the output event field containing the window size. | |
Lower limit field | Enter a name for the output event field containing the lower limit of the current window. | |
Upper limit field | Enter a name for the output event field containing the upper limit of the current window. | |
Allow empty collections | When collection occurs in output ports, whether empty collections should emit a result (true) or not (false). This is set to false by default. |
Input ports
Port | Description |
---|---|
in | All events enter through this port. |
Output ports
Port | Description |
---|---|
out | Outputs events properly stored. |
discarded | Outputs events that do not match the time condition. |
error | Signals when an error occurs. Outputs input events enriched with standard error fields. |
Example
In this example, we wish to receive by email the number of new user logins in a one-minute time window.
For this, we add first add a Scheduler unit to send events every 10 seconds.
Then, we link it to the in port of the Map unit. In the properties, we add the new user key as a string variable.
Next, add the Time Window Collector unit to the canvas to receive new user logins via the in port.
In the properties, specify the key field that coincides with the Map unit, in this case the Key field earlier configured. Add the purge size as the same as the size field, in this case 60, to use a tumbling window. This means a window of 60 seconds, sending the results via the out port every minute.
Add an Email Sink unit and specify the required subject and message of the emails and the list of addresses to which you want to notify.
Download this example
You can try this flow by downloading the following JSON file and uploading it to your domain using the Import option: