Installing Elastic stack for IPFIX collection

This guide contains instructions for installing an Elastic stack instance for the purposes of IPFIX flow collection. It is assumed that the installation will take place on a standard Linux distribution installation that is dedicated for this purpose.

Installing the Elastic stack

The instructions here are provided as a quick example. They are specific to the openSUSE Leap distribution. The instructions for other distributions will be slightly different. See here for more detailed instructions for other distributions.

Import the package signing key:

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Add the repo:

zypper ar -f https://artifacts.elastic.co/packages/6.x/yum elasticsearch-6.x

Install:

zypper in java-1_8_0-openjdk elasticsearch logstash kibana

Enable and start the services:

systemctl enable elasticsearch.service kibana.service logstash.service
systemctl start elasticsearch.service kibana.service logstash.service

Optional: We can install httpie for a simple interface for making manual requests to the services for testing:

zypper in python3-requests python3-Pygments
pip3 install httpie

Now with httpie we can check the elasticsearch service on port 9200:

http get http://localhost:9200/

You should see output like this if the service is running properly:

HTTP/1.1 200 OK
content-encoding: gzip
content-length: 279
content-type: application/json; charset=UTF-8

{
    "cluster_name": "elasticsearch",
    "cluster_uuid": "IgDZkOv9QiO7qyuUd0MHdg",
    "name": "4Iv8FWC",
    "tagline": "You Know, for Search",
    "version": {
        "build_date": "2017-12-17T20:23:25.338Z",
        "build_hash": "bd92e7f",
        "build_snapshot": false,
        "lucene_version": "7.1.0",
        "minimum_index_compatibility_version": "5.0.0",
        "minimum_wire_compatibility_version": "5.6.0",
        "number": "6.1.1"
    }
}

Also check the kibana service with httpie:

http get http://localhost:5601/

You should see output like this:

HTTP/1.1 200 OK
Connection: keep-alive
Date: Fri, 12 Jan 2018 17:25:07 GMT
Transfer-Encoding: chunked
cache-control: no-cache
content-encoding: gzip
content-type: text/html; charset=utf-8
kbn-name: kibana
kbn-version: 6.1.1
vary: accept-encoding

<script>var hashRoute = '/app/kibana';
var defaultRoute = '/app/kibana';

var hash = window.location.hash;
if (hash.length) {
window.location = hashRoute + hash;
} else {
window.location = defaultRoute;
}</script>

Configuring Logstash to accept IPFIX data

Configure an IPFIX input and output in logstash:

cat <<'EOF' > /etc/logstash/conf.d/ipfix.conf
input {
    udp {
        port => 2055
        codec => netflow {
            versions => 10
        }
        type => ipfix
    }
}
output {
    if [type] == "ipfix" {
        elasticsearch {
            hosts => "127.0.0.1"
            index => "ipfix-%{+YYYY.MM.dd}"
        }
    }
}
EOF

See here for more information on the configuration options.

Restart logstash:

systemctl restart logstash.service

Set up a flow collector to send data to the IP of the server on port 2055, and enable it.

Optional: Translating IPFIX fields

Some of the raw fields don’t look very nice in Kibana, particularly protocols and transport ports, since they are expressed as simple numbers. Logstash allows for translation or creation of new fields as data comes in. Let’s add some translations for protocol names. We can also normalize IPv4 and IPv6 addresses into fields that can contain either to make searching and filtering easier.

First create a directory to store the protocol translations:

mkdir -p /etc/logstash/dictionaries

Create mappings for protocols and ports from the IANA database:

curl -s https://www.iana.org/assignments/protocol-numbers/protocol-numbers-1.csv \
    | grep -e '^[0-9]\+,' | cut -d ',' -f1,2 | grep -ve ',$' | head -n-1 \
    | sed -e 's/\([0-9]\+\),\(.*\)/"\1": \2/g' \
    > /etc/logstash/dictionaries/iana-protocol-numbers.yml
curl -s https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv \
    -o /tmp/service-names-port-numbers.csv
for protocol in tcp udp sctp dccp ; do
    cut -d ',' -f1,2,3 /tmp/service-names-port-numbers.csv \
        | grep -e "^[a-zA-Z0-9-]\+,[0-9]\+,$protocol" \
        | sed -e 's/\([a-zA-Z0-9-]\+\),\([0-9]\+\),.*/"\2": \1/g' \
        > /etc/logstash/dictionaries/iana-${protocol}-numbers.yml
done
rm /tmp/service-names-port-numbers.csv

Also create mappings for nDPI protocols:

curl -s https://raw.githubusercontent.com/ntop/nDPI/dev/src/include/ndpi_protocol_ids.h \
    | grep -v NDPI_PROTOCOL_SIZE \
    | grep -P '#define NDPI_(PROTOCOL|CONTENT).*[0-9]+' \
    | sed -e 's|\/\*.*\*\/||g' | awk '{print "\"" $3 "\""  ": "  $2 }' \
    | uniq \
    | sed -e 's/NDPI_PROTOCOL_//g' \
    | sed -e 's/NDPI_CONTENT_//g'
    > /etc/logstash/dictionaries/ndpi-application-ids.yml

Now add a filter section between the input and output sections in /etc/logstash/conf.d/ipfix.conf:

input {
    udp {
        port => 2055
        codec => netflow {
            versions => 10
        }
        type => ipfix
    }
}
filter {
    # nDPI stuff
    if [netflow][applicationId] {
        ruby {
            code => 'event.set("[netflow][application]", event.get("[netflow][applicationId]").rjust(4, "\x00").unpack("N")[0])'
        }
        translate {
            dictionary_path => "/etc/logstash/dictionaries/ndpi-application-ids.yml"
            field => "[netflow][application]"
            destination => "[netflow][applicationName]"
            fallback => "UNKNOWN"
        }
        ruby {
            code => 'event.set("[netflow][applicationId]", [event.get("[netflow][applicationId]")].pack("m").strip())'
        }
    }
    # Normalize addresses
    if [netflow][ipVersion] == 4 {
        mutate {
            rename => {
                "[netflow][sourceAddress]" => "[netflow][sourceIPv4Address]"
                "[netflow][destinationAddress]" => "[netflow][destinationIPv4Address]"
            }
        }
    }
    if [netflow][ipVersion] == 6 {
        mutate {
            rename => {
                "[netflow][sourceAddress]" => "[netflow][sourceIPv6Address]"
                "[netflow][destinationAddress]" => "[netflow][destinationIPv6Address]"
            }
        }
    }
    # Add fields with descriptive protocol strings
    if [netflow][protocolIdentifier] {
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-protocol-numbers.yml"
            field => "[netflow][protocolIdentifier]"
            destination => "[netflow][protocolName]"
            fallback => "Unknown"
        }
    }
    if [netflow][protocolIdentifier] == 6 {
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
            field => "[netflow][sourceTransportPort]"
            destination => "[netflow][sourceTransportPortName]"
            fallback => "Unknown"
        }
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
            field => "[netflow][destinationTransportPort]"
            destination => "[netflow][destinationTransportPortName]"
            fallback => "Unknown"
        }
    }
    if [netflow][protocolIdentifier] == 17 {
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
            field => "[netflow][sourceTransportPort]"
            destination => "[netflow][sourceTransportPortName]"
            fallback => "Unknown"
        }
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
            field => "[netflow][destinationTransportPort]"
            destination => "[netflow][destinationTransportPortName]"
            fallback => "Unknown"
        }
    }
    if [netflow][protocolIdentifier] == 33 {
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
            field => "[netflow][sourceTransportPort]"
            destination => "[netflow][sourceTransportPortName]"
            fallback => "Unknown"
        }
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
            field => "[netflow][destinationTransportPort]"
            destination => "[netflow][destinationTransportPortName]"
            fallback => "Unknown"
        }
    }
    if [netflow][protocolIdentifier] == 132 {
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
            field => "[netflow][sourceTransportPort]"
            destination => "[netflow][sourceTransportPortName]"
            fallback => "Unknown"
        }
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
            field => "[netflow][destinationTransportPort]"
            destination => "[netflow][destinationTransportPortName]"
            fallback => "Unknown"
        }
    }
    if [netflow][protocolIdentifier] in [6, 17, 33, 172] {
        if [netflow][sourceTransportPortName] != "Unknown" {
            mutate {
                add_field => {
                    "[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]} (%{[netflow][sourceTransportPortName]})"
                }
            }
        } else {
            mutate {
                add_field => {
                    "[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]}"
                }
            }
        }
        if [netflow][destinationTransportPortName] != "Unknown" {
            mutate {
                add_field => {
                    "[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]} (%{[netflow][destinationTransportPortName]})"
                }
            }
        } else {
            mutate {
                add_field => {
                    "[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]}"
                }
            }
        }
        # Assume that the lower port number is the service port
        if [netflow][sourceTransportPort] < [netflow][destinationTransportPort] {
            mutate {
                add_field => {
                    "[netflow][serviceDescription]" => "%{[netflow][sourceTransportDescription]}"
                }
            }
        } else {
            mutate {
                add_field => {
                    "[netflow][serviceDescription]" => "%{[netflow][destinationTransportDescription]}"
                }
            }
        }
    } else {
        if [netflow][protocolName] != "Unknown" {
            mutate {
                add_field => {
                    "[netflow][serviceDescription]" => "%{[netflow][protocolName]}"
                }
            }
        } else {
            mutate {
                add_field => {
                    "[netflow][serviceDescription]" => "Unknown (%{[netflow][protocolIdentifier]})"
                }
            }
        }
    }
}
output {
    if [type] == "ipfix" {
        elasticsearch {
            hosts => "127.0.0.1"
            index => "ipfix-%{+YYYY.MM.dd}"
        }
    }
}

Then restart Logstash:

systemctl restart logstash

Additional filters may be added according to the desired fields. For example, it’s possible to generate a list of host-to-bond ID mappings from the bondingadmin API to populate a bond ID field, depending on the flow collector’s source IP policy.

Installing a frontend webserver

This is required to allow access to the Kibana dashboard from other hosts.

Install Nginx:

zypper in nginx

Edit /etc/nginx/nginx.conf and change the default server entry to proxy to Kibana:

server {
    listen       80;

    location / {
        proxy_pass http://localhost:5601;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
    }
}

Enable and start the service:

systemctl enable nginx.service
systemctl start nginx.service

Note that this allows access from anywhere without authentication. If a firewall is not in place, authentication should be set up in Nginx. See here for instructions.

Configuring Kibana to index the data

Navigate to the ip of the server in a web browser, which will show the Kibana welcome page. The first thing that needs to be set up is the index pattern for the data we are collecting.

Click the “Set up index patterns” button on the upper right. It will display the index pattern creation page. If you are collecting data already, you will see an index for the current date. We need to set up a pattern that will match the indices for all dates, so set the index pattern to ipfix-* and click the Next step button.

Now choose @timestamp for the Time Filter field name and click the Create index pattern button to finish.

Contents of the data

Now you can navigate to the Discover panel to see the raw flow data that was captured. Clicking the arrow to the left of an entry will expand it, showing all of the fields recorded for the flow.

Here are the most important fields. Not all of them will be present in all records, depending on the actual flow type:

@timestamp
This is the time when the flow was reported to Logstash.
host
This is the IP address of the device that sent the flow record. This can be used to determine which bonder handled the flow, according to the Source IP policy parameter of the flow collector defined in Bondingadmin.
netflow.ipVersion
This is the IP version of the flow. It will be 4 for IPv4 and 6 for IPv6.
netflow.protocolIdentifier
The protocol ID of the flow. This will be 1 for ICMP, 6 for TCP and 17 for UDP. The full list is available here
netflow.protocolName
The name for the protocol ID of the flow (only present if logstash translation was implemented above)
netflow.sourceAddress
The source IPv4/IPv6 address of the flow (only present if logstash translation was implemented above)
netflow.sourceIPv4Address
The source IPv4 address of the flow (not present if logstash translation was implemented above)
netflow.sourceIPv6Address
The source IPv6 address of the flow (not present if logstash translation was implemented above)
netflow.destinationAddress
The destination IPv4/IPv6 address of the flow (only present if logstash translation was implemented above)
netflow.destinationIPv4Address
The destination IPv4 address of the flow (not present if logstash translation was implemented above)
netflow.destinationIPv6Address
The destination IPv6 address of the flow (not present if logstash translation was implemented above)
netflow.sourceTransportPort
The source port of the flow, if the flow protocol has port numbers. The TCP, UDP, and SCTP protocols have port numbers.
netflow.sourceTransportPortName
The name for the value in netflow.sourceTransportPort (only present if logstash translation was implemented above)
netflow.sourceTransportDescription
A full description of the values in netflow.protocolName and netflow.sourceTransportPort that is useful for display (only present if logstash translation was implemented above)
netflow.destinationTransportPort
The source port of the flow, if the flow protocol has port numbers. The TCP, UDP, and SCTP protocols have port numbers.
netflow.destinationTransportPortName
The name for the value in netflow.destinationTransportPort (only present if logstash translation was implemented above)
netflow.destinationTransportDescription
A full description of the values in netflow.protocolName and netflow.destinationTransportPort that is useful for display (only present if logstash translation was implemented above)
netflow.serviceDescription
A full description of the the service including the protocol and port information. For port-based protocols, this is derived from the relevant netflow.sourceTransportDescription or netflow.destinationTransportDescription value, assuming that the lowest of the two port numbers is the service port (only present if logstash translation was implemented above)
netflow.tcpControlBits
If the flow protocol is TCP, this will contain the TCP flags (control bits) for the packet. See the entry in this page for more information.
netflow.flowStartMilliseconds
The time the flow started.
netflow.flowEndMilliseconds
The time the flow ended.
netflow.octetDeltaCount
The number of bytes transferred in the flow since the last record.
netflow.packetDeltaCount
The number of packets transferred in the flow since the last record.

Some flows may contain other fields depending on configuration and protocol type. An official list of potential fields is available here but most of them are not available. Some of those fields, as well as some custom ones not defined by the IANA may be added in future releases of Bonding.

Creating visualizations

See here for a more complete guide on creating visualizations.

Top 10 hosts table

Let’s create a simple data table first. This will list the top 10 hosts in descending order of bytes transferred.

Create a Data Table visualization.

For Metric select the Sum aggregation of netflow.octetDeltaCount and set the label to Bytes.

For Buckets select the``Terms`` aggregation of netflow.destinationIPv4Address.keyword and order by the sum of netflow.octetDeltaCount, descending, with a size of 10. Set the label to Host.

Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.

Top 10 services chart

If the logstash translation is enabled there is a field that records the service regardless of the direction. We can use this to track the popular services on the network.

Create an Vertical Bar visualization.

For Y-Axis select the Sum of the netflow.octetDeltaCount field and set the label to Bytes.

For the X-Axis select the Terms aggregation of netflow.serviceDescription.keyword field and order by the sum of netflow.octetDeltaCount, descending, with a size of 10. Set the label to Service.

Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.

Service traffic over time

It is also useful to see which hosts transfer the most traffic at specific times. A time-based line chart will work best for this.

Create a Timelion visualization.

The default expression simply shows the number of flows, which is not too useful, so we need to create an expression that shows the services.

Change the Timelion Expression field to the following:

.es(index=ipfix-*, split=netflow.serviceDescription.keyword:10, metric=sum:netflow.octetDeltaCount, kibana=true).scale_interval(1s).fit(mode=scale).lines(width=1, fill=true, stack=true).if(operator="lt", if=0, then=0).label(regex='netflow.serviceDescription.keyword:(.+) >.*$', label="$1").yaxis(label="bytes/s", min=0)

See here for a better tutorial on creating Timelion charts.

Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.

Creating dashboards

Visualizations can be organised into dashboards for quick viewing of important data.

To create a dashboard, click the Dashboard menu item on the left and click the Create a dashboard button. On the resulting page, click the Add button at the top to add each of the visualizations created earlier.

Resize and rearrange the panels to your liking and click the Save button at the top to save the dashboard.