Document toolboxDocument toolbox

Customized alert reports

Description

A Flow that pulls an alert count from 2 different time periods, combines and transforms it into a specific format, and sends a report with the results via email to the required recipients.

This Flow checks the number of alerts triggered in the last hour and compares them with the alerts received over the same day and hour of the previous week. The info comparing both alert counts is sent via email every x minutes. 

To get this, the Flow is made up of 2 different branches:

  • The top branch counts the alerts triggered over the last hour.

  • The bottom branch counts the alerts triggered over the same hour, seven days previously.

In Flows with different branches like this one, note that branches are always executed in a specific sequential order. You can check the order by hovering over the links that go out from a specific port and checking the number that appears. In this specific Flow, we need the top branch to be executed before the other one.

To change the execution order, click a link, select the arrows icon that appears and click the top or bottom arrow button to set the order of that link higher or lower. Learn more about this in Working with links.

Flow configuration

The following table describes the units needed to create this Flow, as well as how to configure and link them.

Unit type and description

Configuration

Unit type and description

Configuration

Generator

This unit will fire an event every x minutes (in this example, every minute).

We will enrich these events using the following units and will use them to activate the required queries.

Drag a Generator unit to the canvas and open its configuration options by double-clicking it.

On the General tab, add a Name to the unit (in this example, we called it Clock), and specify the Time field name that will contain the output event times (in this example, eventdate).

Then, on the Period tab, enter 60000 in the Millis field. Also, switch on the Exact, Aligned, and Drop past periods toggles.

Click Apply to save the configuration.

Map

We will use a couple of Map units to enrich the events with time information.

As said above, we will be comparing alerts triggered the last hour with alerts triggered over the same day and hour of the previous week. To do it, we will use these Map units to specify the start and end date of each period.

Drag 2 Map units to the canvas and link their in ports to the out port of the Generator unit.

As mentioned in the introduction of this article, Flows follow a sequential order. In this case, we want the top branch of the Flow to activate before the bottom want. To do this, you must first link the Generator unit to the top Map (prepareQueryTimeInterval), and then to the bottom one (preparePastQuertTimeInterval).

As said above, you can always change the execution order by clicking a link, selecting the arrows icon that appears, and clicking the top or bottom arrow button to set the order of that link higher or lower. Learn more about this in Working with links.

Now follow the steps below to configure both units:

  1. Map 1
    1. Open the configuration options of the first Map unit by double-clicking it.
    2. On the General tab, add a Name to the unit (in this example, we called it prepareQueryTimeInterval), and leave the Language field as default.

  1. Then, on the Fields to add tab, you must add 3 different fields by clicking the + icon:

  • Field 1
    This column will contain the end date of the period to be analyzed. In this case, we need the current time as end date.

    • Field name: endDate

    • Type: Long

    • Expression: eventdate.getTime()

  • Field 2
    This column will contain the start date of the period to be analyzed. In this case, we need to get 1 hour before the current date.

    • Field name: startDate

    • Type: Long

    • Expression: endDate - java.time.Duration.ofHours(1).toMillis()

  • Field 3
    We will use this column to identify the time period of the events.

    • Field name: header

    • Type: String

    • Expression: “Today"

  • Map 2

  1. Open the configuration options of the second Map unit by double-clicking it.

  2. On the General tab, add a Name to the unit (in this example, we called it preparePastQueryTimeInterval), and leave the Language field as default.

Then, on the Fields to add tab, you must add 3 different fields by clicking the + icon:

  • Field 1
    This column will contain the end date of the period to be analyzed. In this case, we need the current time of the previous week as the end date.

    • Field name: endDate

    • Type: Long

    • Expression: eventdate.getTime()-
      java.time.Duration.ofDays(7).toMillis()

  • Field 2
    This column will contain the start date of the period to be analyzed. In this case, we need to get 1 hour before the current time of the previous week.

    • Field name: startDate

    • Type: Long

    • Expression: endDate - java.time.Duration.ofHours(1).toMillis()

  • Field 3
    We will use this column to identify the time period of the events.

    • Field name: header

    • Type: String

    • Expression: “Last week"

  • Click Apply to save the configuration.

Devo Full Query

We will use a couple of Devo Full Query units to specify the query that defines the alerts to be compared.

We will be querying the siem.logtrust.alert.info table, which receives all the alerts triggered in your domain.

Drag 2 Devo Full Query units to the canvas and link their in ports to the out ports of the Map units, as follows:

Now follow the steps below to configure both units:

  • Devo Full Query 1

  1. Open the configuration options of the first Devo Full Query unit by double-clicking it.

  2. On the General tab, give the unit a Name (in this example, we called it alertTriggeredQuery) and enter the following in the Query field:
    from siem.logtrust.alert.info
    select eventdate, context
    group by context
    select first(eventdate) as eventdate, count() as count

Then, on the From event tab, choose the startDate and endDate columns in the Start time and End time fields.

  1. Click Apply to save the configuration.

  • Devo Full Query 2

  1. Open the configuration options of the second Devo Full Query unit by double-clicking it.

  2. On the General tab, give the unit a Name (in this example, we called it alertTriggeredPastQuery) and enter the following in the Query field:
    from siem.logtrust.alert.info
    select eventdate, context
    group by context select first(eventdate) as eventdate, count() as count

Then, on the From event tab, choose the startDate and endDate columns in the Start time and End time fields.

  1. Click Apply to save the configuration.

Reducer

We will use a couple of Reducer units to store all the events received during the query periods set, combine and format them and finally emit an only event with all the alerts triggered during those periods.

 

Drag 2 Reducer units to the canvas and link them to the Devo Full Query units as follows:

  • Link the init port of the Devo Full Query units to the reset port of the Reducer units. This way, each time a query is started, the Reducer units will be reset with the value specified in the Init Value of the unit. We will set this initial value as empty.

  • Link the data port of the Devo Full Query units to the in port of the Reducer units. The expression set in the Reducer units will be evaluated and stored with the corresponding query values. We will use an HTML expression to format the resulting reports.

  • Link the end port of the Devo Full Query units to the get port of the Reducer units. When the query ends, a signal event will be sent to the get port of the Reducer units. This will make the Reducer units emit an only event with all the stored information through the output current ports.

Now follow the steps below to configure both units:

  • Reducer 1

  1. Open the configuration options of the first Reducer unit by double-clicking it.

  2. Give the unit a Name (in this example, we called it prepareMessage).

  3. Leave the Language as default (Groovy) and enter "" in the Init Value field.

  4. Set the Field name as message and choose String as Accumulator Type.

  5. Then, enter the following in the Expression field:
    __acc__ +
    "<tr> <td>"+new Date(startDate)+"</td>"+
    "<td>"+context+"</td>"+
    "<td>"+count+"</td>"+
    "</tr>"

Click Apply to save the configuration.

  • Reducer 2

  1. Open the configuration options of the second Reducer unit by double-clicking it.

  2. Give the unit a Name (in this example, we called it preparePastMessage).

  3. Leave the Language as default (Groovy) and enter "" in the Init Value field.

  4. Set the Field name as message and choose String as Accumulator Type.

  5. Then, enter the following in the Expression field:
    __acc__ +
    "<tr> <td>"+new Date(startDate)+"</td>"+
    "<td>"+context+"</td>"+
    "<td>"+count+"</td>"+
    "</tr>"

Click Apply to save the configuration.

Reducer

We will use an additional Reducer to combine the information stored by the previously added Reducer units and send the final report with the alert info from both time periods.

Drag a Reducer unit to the canvas and link it to the previously added Reducers in the following order:

  1. First, link the current port of the first Reducer unit (prepareMessage) to the reset port of the new Reducer unit. This will reset this Reducer unit each time information goes out from the first Reducer, which is the first one in our sequence.

  2. Then, link the current port of the first Reducer unit (prepareMessage) to the in port of the new Reducer unit. The Reducer will store the information that gets through the in port.

  3. Now, link the current port of the second Reducer unit (preparePastMessage) to the in port of the new Reducer unit. This will add the information of the unit to the new Reducer

    • Finally, link the current port of the second Reducer unit (preparePastMessage) to the get port of the new Reducer. This will make the new Reducer emit the report with all the information stored. The final report will go out through the current port of this Reducer.
      This is the sequence order of this part of the Flow:
      (1) The combineMessages Reducer is reset. 

      (2) The combineMessages Reducer gets the information stored by the prepareMessage Reducer.

      (3) The combineMessages Reducer gets the information stored by the preparePastMessage Reducer.

      (4) The combineMessages Reducer is notified to emit the report with all the information stored.

As said in the introduction of this article, remember that the link order matters. You must link the ports in the order described above.

Then, open the configuration options of the unit by double-clicking it. Add a Name to the unit (in this example, we called it combineMessages).

Leave the Language as default (Groovy) and enter ““ in the Init Value field.

Set the Field Name as combinedMessage and choose String as Accumulator Type.

Then, enter the following in the Expression field:
__acc__ +
"<tr> <td></td>"+
"<td>"+header+"</td>"+
"<td></td>"+ "</tr>" + message

Click Apply to save the configuration.

Email Sink

We will use this unit to send the reports generated to the required user emails.

Drag an Email Sink unit to the canvas and link the current port of the Reducer unit to the in port of this unit, as follows:

  1. Then, open the configuration options of the unit by double-clicking it. In the General tab, add a Name to the unit (in this example, we called it notify), add a Subject for the emails to be sent (in this example, Alert Report).

    Then, in the Message field, you must enter the text to be sent in the mails. For this example, we added the following (where combinedMessage is a variable field that will be automatically replaced by the corresponding values)
    <table>
    <tr>
    <th>Date</th>
    <th>Alert</th>
    <th>Count</th>
    %%{combinedMessage}
    </tr>
    </table>

  2. Then, click the To recipients tab and add as many email addresses as required by clicking the + icon.

  3. Click Apply to save the configuration.

Once you're done, remember to save your Flow so you can start running it. To do it, click the Save button on the Flow toolbar.

Result

Once you have defined the whole Flow and saved it, click the Start button to activate it. If everything is correctly configured, the Flow will send an email to the given addresses with a report comparing the alert count over the last hour with the alert count over the same hour and day of the previous week.

Import this Flow

Download this Flow in JSON format and import it to your domain clicking the Import from JSON option in the File menu.

Remember to add the required target email addresses as explained above before running the Flow.