=============================================
Installing Elastic stack for IPFIX collection
=============================================

This guide contains instructions for installing an Elastic stack instance for
the purposes of IPFIX flow collection. It is assumed that the installation
will take place on a standard Linux distribution installation that is
dedicated for this purpose.


----------------------------
Installing the Elastic stack
----------------------------

The instructions here are provided as a quick example. They are specific to
the openSUSE Leap distribution. The instructions for other distributions will
be slightly different. See `here
<https://www.elastic.co/guide/en/elastic-stack/current/installing-elastic-stack.html>`__
for more detailed instructions for other distributions.

Import the package signing key::

    rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Add the repo::

    zypper ar -f https://artifacts.elastic.co/packages/6.x/yum elasticsearch-6.x

Install::

    zypper in java-1_8_0-openjdk elasticsearch logstash kibana

Enable and start the services::

    systemctl enable elasticsearch.service kibana.service logstash.service
    systemctl start elasticsearch.service kibana.service logstash.service

*Optional:* We can install httpie for a simple interface for making manual
requests to the services for testing::

    zypper in python3-requests python3-Pygments
    pip3 install httpie

Now with httpie we can check the elasticsearch service on port 9200::

    http get http://localhost:9200/

You should see output like this if the service is running properly::

    HTTP/1.1 200 OK
    content-encoding: gzip
    content-length: 279
    content-type: application/json; charset=UTF-8

    {
        "cluster_name": "elasticsearch",
        "cluster_uuid": "IgDZkOv9QiO7qyuUd0MHdg",
        "name": "4Iv8FWC",
        "tagline": "You Know, for Search",
        "version": {
            "build_date": "2017-12-17T20:23:25.338Z",
            "build_hash": "bd92e7f",
            "build_snapshot": false,
            "lucene_version": "7.1.0",
            "minimum_index_compatibility_version": "5.0.0",
            "minimum_wire_compatibility_version": "5.6.0",
            "number": "6.1.1"
        }
    }

Also check the kibana service with httpie::

    http get http://localhost:5601/

You should see output like this::

    HTTP/1.1 200 OK
    Connection: keep-alive
    Date: Fri, 12 Jan 2018 17:25:07 GMT
    Transfer-Encoding: chunked
    cache-control: no-cache
    content-encoding: gzip
    content-type: text/html; charset=utf-8
    kbn-name: kibana
    kbn-version: 6.1.1
    vary: accept-encoding

    <script>var hashRoute = '/app/kibana';
    var defaultRoute = '/app/kibana';

    var hash = window.location.hash;
    if (hash.length) {
    window.location = hashRoute + hash;
    } else {
    window.location = defaultRoute;
    }</script>


-----------------------------------------
Configuring Logstash to accept IPFIX data
-----------------------------------------

Configure an IPFIX input and output in logstash::

    cat <<'EOF' > /etc/logstash/conf.d/ipfix.conf
    input {
        udp {
            port => 2055
            codec => netflow {
                versions => 10
            }
            type => ipfix
        }
    }
    output {
        if [type] == "ipfix" {
            elasticsearch {
                hosts => "127.0.0.1"
                index => "ipfix-%{+YYYY.MM.dd}"
            }
        }
    }
    EOF

See `here
<https://www.elastic.co/guide/en/logstash/current/plugins-codecs-netflow.html>`__
for more information on the configuration options.

Restart logstash::

    systemctl restart logstash.service

Set up a flow collector to send data to the IP of the server on port 2055, and
enable it.


Optional: Translating IPFIX fields
----------------------------------

Some of the raw fields don't look very nice in Kibana, particularly protocols
and transport ports, since they are expressed as simple numbers. Logstash
allows for translation or creation of new fields as data comes in. Let's add
some translations for protocol names. We can also normalize IPv4 and IPv6
addresses into fields that can contain either to make searching and filtering
easier.

First create a directory to store the protocol translations::

    mkdir -p /etc/logstash/dictionaries

Create mappings for protocols and ports from the IANA database::

    curl -s https://www.iana.org/assignments/protocol-numbers/protocol-numbers-1.csv \
        | grep -e '^[0-9]\+,' | cut -d ',' -f1,2 | grep -ve ',$' | head -n-1 \
        | sed -e 's/\([0-9]\+\),\(.*\)/"\1": \2/g' \
        > /etc/logstash/dictionaries/iana-protocol-numbers.yml
    curl -s https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv \
        -o /tmp/service-names-port-numbers.csv
    for protocol in tcp udp sctp dccp ; do
        cut -d ',' -f1,2,3 /tmp/service-names-port-numbers.csv \
            | grep -e "^[a-zA-Z0-9-]\+,[0-9]\+,$protocol" \
            | sed -e 's/\([a-zA-Z0-9-]\+\),\([0-9]\+\),.*/"\2": \1/g' \
            > /etc/logstash/dictionaries/iana-${protocol}-numbers.yml
    done
    rm /tmp/service-names-port-numbers.csv

Also create mappings for nDPI protocols::

    curl -s https://raw.githubusercontent.com/ntop/nDPI/dev/src/include/ndpi_protocol_ids.h \
        | grep -v NDPI_PROTOCOL_SIZE \
        | grep -P '#define NDPI_(PROTOCOL|CONTENT).*[0-9]+' \
        | sed -e 's|\/\*.*\*\/||g' | awk '{print "\"" $3 "\""  ": "  $2 }' \
        | uniq \
        | sed -e 's/NDPI_PROTOCOL_//g' \
        | sed -e 's/NDPI_CONTENT_//g'
        > /etc/logstash/dictionaries/ndpi-application-ids.yml

Now add a filter section between the input and output sections in
``/etc/logstash/conf.d/ipfix.conf``::

    input {
        udp {
            port => 2055
            codec => netflow {
                versions => 10
            }
            type => ipfix
        }
    }
    filter {
        # nDPI stuff
        if [netflow][applicationId] {
            ruby {
                code => 'event.set("[netflow][application]", event.get("[netflow][applicationId]").rjust(4, "\x00").unpack("N")[0])'
            }
            translate {
                dictionary_path => "/etc/logstash/dictionaries/ndpi-application-ids.yml"
                field => "[netflow][application]"
                destination => "[netflow][applicationName]"
                fallback => "UNKNOWN"
            }
            ruby {
                code => 'event.set("[netflow][applicationId]", [event.get("[netflow][applicationId]")].pack("m").strip())'
            }
        }
        # Normalize addresses
        if [netflow][ipVersion] == 4 {
            mutate {
                rename => {
                    "[netflow][sourceAddress]" => "[netflow][sourceIPv4Address]"
                    "[netflow][destinationAddress]" => "[netflow][destinationIPv4Address]"
                }
            }
        }
        if [netflow][ipVersion] == 6 {
            mutate {
                rename => {
                    "[netflow][sourceAddress]" => "[netflow][sourceIPv6Address]"
                    "[netflow][destinationAddress]" => "[netflow][destinationIPv6Address]"
                }
            }
        }
        # Add fields with descriptive protocol strings
        if [netflow][protocolIdentifier] {
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-protocol-numbers.yml"
                field => "[netflow][protocolIdentifier]"
                destination => "[netflow][protocolName]"
                fallback => "Unknown"
            }
        }
        if [netflow][protocolIdentifier] == 6 {
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
                field => "[netflow][sourceTransportPort]"
                destination => "[netflow][sourceTransportPortName]"
                fallback => "Unknown"
            }
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
                field => "[netflow][destinationTransportPort]"
                destination => "[netflow][destinationTransportPortName]"
                fallback => "Unknown"
            }
        }
        if [netflow][protocolIdentifier] == 17 {
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
                field => "[netflow][sourceTransportPort]"
                destination => "[netflow][sourceTransportPortName]"
                fallback => "Unknown"
            }
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
                field => "[netflow][destinationTransportPort]"
                destination => "[netflow][destinationTransportPortName]"
                fallback => "Unknown"
            }
        }
        if [netflow][protocolIdentifier] == 33 {
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
                field => "[netflow][sourceTransportPort]"
                destination => "[netflow][sourceTransportPortName]"
                fallback => "Unknown"
            }
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
                field => "[netflow][destinationTransportPort]"
                destination => "[netflow][destinationTransportPortName]"
                fallback => "Unknown"
            }
        }
        if [netflow][protocolIdentifier] == 132 {
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
                field => "[netflow][sourceTransportPort]"
                destination => "[netflow][sourceTransportPortName]"
                fallback => "Unknown"
            }
            translate {
                dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
                field => "[netflow][destinationTransportPort]"
                destination => "[netflow][destinationTransportPortName]"
                fallback => "Unknown"
            }
        }
        if [netflow][protocolIdentifier] in [6, 17, 33, 172] {
            if [netflow][sourceTransportPortName] != "Unknown" {
                mutate {
                    add_field => {
                        "[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]} (%{[netflow][sourceTransportPortName]})"
                    }
                }
            } else {
                mutate {
                    add_field => {
                        "[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]}"
                    }
                }
            }
            if [netflow][destinationTransportPortName] != "Unknown" {
                mutate {
                    add_field => {
                        "[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]} (%{[netflow][destinationTransportPortName]})"
                    }
                }
            } else {
                mutate {
                    add_field => {
                        "[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]}"
                    }
                }
            }
            # Assume that the lower port number is the service port
            if [netflow][sourceTransportPort] < [netflow][destinationTransportPort] {
                mutate {
                    add_field => {
                        "[netflow][serviceDescription]" => "%{[netflow][sourceTransportDescription]}"
                    }
                }
            } else {
                mutate {
                    add_field => {
                        "[netflow][serviceDescription]" => "%{[netflow][destinationTransportDescription]}"
                    }
                }
            }
        } else {
            if [netflow][protocolName] != "Unknown" {
                mutate {
                    add_field => {
                        "[netflow][serviceDescription]" => "%{[netflow][protocolName]}"
                    }
                }
            } else {
                mutate {
                    add_field => {
                        "[netflow][serviceDescription]" => "Unknown (%{[netflow][protocolIdentifier]})"
                    }
                }
            }
        }
    }
    output {
        if [type] == "ipfix" {
            elasticsearch {
                hosts => "127.0.0.1"
                index => "ipfix-%{+YYYY.MM.dd}"
            }
        }
    }

Then restart Logstash::

    systemctl restart logstash

Additional filters may be added according to the desired fields. For example,
it's possible to generate a list of host-to-bond ID mappings from the
bondingadmin API to populate a bond ID field, depending on the flow
collector's source IP policy.


-------------------------------
Installing a frontend webserver
-------------------------------

This is required to allow access to the Kibana dashboard from other hosts.

Install Nginx::

    zypper in nginx

Edit /etc/nginx/nginx.conf and change the default server entry to proxy to
Kibana::

    server {
        listen       80;

        location / {
            proxy_pass http://localhost:5601;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection 'upgrade';
            proxy_set_header Host $host;
            proxy_cache_bypass $http_upgrade;
        }
    }

Enable and start the service::

    systemctl enable nginx.service
    systemctl start nginx.service

Note that this allows access from anywhere without authentication. If a
firewall is not in place, authentication should be set up in Nginx. See
`here
<https://www.nginx.com/resources/admin-guide/restricting-access-auth-basic/>`__
for instructions.


------------------------------------
Configuring Kibana to index the data
------------------------------------

Navigate to the ip of the server in a web browser, which will show the Kibana
welcome page. The first thing that needs to be set up is the index pattern for
the data we are collecting.

Click the "Set up index patterns" button on the upper right. It will display
the index pattern creation page. If you are collecting data already, you will
see an index for the current date. We need to set up a pattern that will match
the indices for all dates, so set the index pattern to ``ipfix-*`` and click
the *Next step* button.

Now choose ``@timestamp`` for the *Time Filter field name* and click the
*Create index pattern* button to finish.


--------------------
Contents of the data
--------------------

Now you can navigate to the *Discover* panel to see the raw flow data that was
captured. Clicking the arrow to the left of an entry will expand it, showing
all of the fields recorded for the flow.

Here are the most important fields. Not all of them will be present in all
records, depending on the actual flow type:

``@timestamp``
    This is the time when the flow was reported to Logstash.

``host``
    This is the IP address of the device that sent the flow record. This can
    be used to determine which bonder handled the flow, according to the
    *Source IP policy* parameter of the flow collector defined in
    Bondingadmin.

``netflow.ipVersion``
    This is the IP version of the flow. It will be ``4`` for IPv4 and ``6``
    for IPv6.

``netflow.protocolIdentifier``
    The protocol ID of the flow. This will be ``1`` for ICMP, ``6`` for TCP
    and ``17`` for UDP. The full list is available `here
    <https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml>`__

``netflow.protocolName``
    The name for the protocol ID of the flow (only present if logstash
    translation was implemented above)

``netflow.sourceAddress``
    The source IPv4/IPv6 address of the flow (only present if logstash
    translation was implemented above)

``netflow.sourceIPv4Address``
    The source IPv4 address of the flow (not present if logstash translation
    was implemented above)

``netflow.sourceIPv6Address``
    The source IPv6 address of the flow (not present if logstash translation
    was implemented above)

``netflow.destinationAddress``
    The destination IPv4/IPv6 address of the flow (only present if logstash
    translation was implemented above)

``netflow.destinationIPv4Address``
    The destination IPv4 address of the flow (not present if logstash
    translation was implemented above)

``netflow.destinationIPv6Address``
    The destination IPv6 address of the flow (not present if logstash
    translation was implemented above)

``netflow.sourceTransportPort``
    The source port of the flow, if the flow protocol has port numbers. The
    TCP, UDP, and SCTP protocols have port numbers.

``netflow.sourceTransportPortName``
    The name for the value in ``netflow.sourceTransportPort`` (only present if
    logstash translation was implemented above)

``netflow.sourceTransportDescription``
    A full description of the values in ``netflow.protocolName`` and
    ``netflow.sourceTransportPort`` that is useful for display (only present
    if logstash translation was implemented above)

``netflow.destinationTransportPort``
    The source port of the flow, if the flow protocol has port numbers. The
    TCP, UDP, and SCTP protocols have port numbers.

``netflow.destinationTransportPortName``
    The name for the value in ``netflow.destinationTransportPort`` (only
    present if logstash translation was implemented above)

``netflow.destinationTransportDescription``
    A full description of the values in ``netflow.protocolName`` and
    ``netflow.destinationTransportPort`` that is useful for display (only
    present if logstash translation was implemented above)

``netflow.serviceDescription``
    A full description of the the service including the protocol and port
    information. For port-based protocols, this is derived from the relevant
    ``netflow.sourceTransportDescription`` or
    ``netflow.destinationTransportDescription`` value, assuming that the
    lowest of the two port numbers is the service port (only present if
    logstash translation was implemented above)

``netflow.tcpControlBits``
    If the flow protocol is TCP, this will contain the TCP flags (control
    bits) for the packet. See the entry in `this page
    <https://www.iana.org/assignments/ipfix/ipfix.xhtml>`__ for more
    information.

``netflow.flowStartMilliseconds``
    The time the flow started.

``netflow.flowEndMilliseconds``
    The time the flow ended.

``netflow.octetDeltaCount``
    The number of bytes transferred in the flow since the last record.

``netflow.packetDeltaCount``
    The number of packets transferred in the flow since the last record.

Some flows may contain other fields depending on configuration and protocol
type. An official list of potential fields is available `here
<https://www.iana.org/assignments/ipfix/ipfix.xhtml>`__ but most of them are
not available. Some of those fields, as well as some custom ones not defined
by the IANA may be added in future releases of Bonding.


-----------------------
Creating visualizations
-----------------------

See `here <https://www.elastic.co/guide/en/kibana/current/createvis.html>`__
for a more complete guide on creating visualizations.


Top 10 hosts table
------------------
Let's create a simple data table first. This will list the top 10 hosts in
descending order of bytes transferred.

Create a *Data Table* visualization.

For *Metric* select the ``Sum`` aggregation of ``netflow.octetDeltaCount`` and
set the label to ``Bytes``.

For *Buckets* select the``Terms`` aggregation of
``netflow.destinationIPv4Address.keyword`` and order by the sum of
``netflow.octetDeltaCount``, descending, with a size of 10. Set the label to
``Host``.

Click the triangle button at the top of the form to preview the data and click
*Save* at the top of the page to save the visualization.


Top 10 services chart
---------------------

If the logstash translation is enabled there is a field that records the
service regardless of the direction. We can use this to track the popular
services on the network.

Create an *Vertical Bar* visualization.

For *Y-Axis* select the ``Sum`` of the ``netflow.octetDeltaCount`` field and
set the label to ``Bytes``.

For the *X-Axis* select the ``Terms`` aggregation of
``netflow.serviceDescription.keyword`` field and order by the sum of
``netflow.octetDeltaCount``, descending, with a size of 10. Set the label to
``Service``.

Click the triangle button at the top of the form to preview the data and click
*Save* at the top of the page to save the visualization.


Service traffic over time
-------------------------

It is also useful to see which hosts transfer the most traffic at specific
times. A time-based line chart will work best for this.

Create a *Timelion* visualization.

The default expression simply shows the number of flows, which is not too
useful, so we need to create an expression that shows the services.

Change the ``Timelion Expression`` field to the following::

    .es(index=ipfix-*, split=netflow.serviceDescription.keyword:10, metric=sum:netflow.octetDeltaCount, kibana=true).scale_interval(1s).fit(mode=scale).lines(width=1, fill=true, stack=true).if(operator="lt", if=0, then=0).label(regex='netflow.serviceDescription.keyword:(.+) >.*$', label="$1").yaxis(label="bytes/s", min=0)

See `here <https://www.elastic.co/blog/timelion-tutorial-from-zero-to-hero>`__
for a better tutorial on creating Timelion charts.

Click the triangle button at the top of the form to preview the data and click
*Save* at the top of the page to save the visualization.


-------------------
Creating dashboards
-------------------

Visualizations can be organised into dashboards for quick viewing of important
data.

To create a dashboard, click the *Dashboard* menu item on the left and click
the *Create a dashboard* button. On the resulting page, click the *Add* button
at the top to add each of the visualizations created earlier.

Resize and rearrange the panels to your liking and click the *Save* button at
the top to save the dashboard.
