Installing Elastic stack for IPFIX collection¶
This guide contains instructions for installing an Elastic stack instance for the purposes of IPFIX flow collection. It is assumed that the installation will take place on a standard Linux distribution installation that is dedicated for this purpose.
Installing the Elastic stack¶
The instructions here are provided as a quick example. They are specific to the openSUSE Leap distribution. The instructions for other distributions will be slightly different. See here for more detailed instructions for other distributions.
Import the package signing key:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
Add the repo:
zypper ar -f https://artifacts.elastic.co/packages/6.x/yum elasticsearch-6.x
Install:
zypper in java-1_8_0-openjdk elasticsearch logstash kibana
Enable and start the services:
systemctl enable elasticsearch.service kibana.service logstash.service
systemctl start elasticsearch.service kibana.service logstash.service
Optional: We can install httpie for a simple interface for making manual requests to the services for testing:
zypper in python3-requests python3-Pygments
pip3 install httpie
Now with httpie we can check the elasticsearch service on port 9200:
http get http://localhost:9200/
You should see output like this if the service is running properly:
HTTP/1.1 200 OK
content-encoding: gzip
content-length: 279
content-type: application/json; charset=UTF-8
{
"cluster_name": "elasticsearch",
"cluster_uuid": "IgDZkOv9QiO7qyuUd0MHdg",
"name": "4Iv8FWC",
"tagline": "You Know, for Search",
"version": {
"build_date": "2017-12-17T20:23:25.338Z",
"build_hash": "bd92e7f",
"build_snapshot": false,
"lucene_version": "7.1.0",
"minimum_index_compatibility_version": "5.0.0",
"minimum_wire_compatibility_version": "5.6.0",
"number": "6.1.1"
}
}
Also check the kibana service with httpie:
http get http://localhost:5601/
You should see output like this:
HTTP/1.1 200 OK
Connection: keep-alive
Date: Fri, 12 Jan 2018 17:25:07 GMT
Transfer-Encoding: chunked
cache-control: no-cache
content-encoding: gzip
content-type: text/html; charset=utf-8
kbn-name: kibana
kbn-version: 6.1.1
vary: accept-encoding
<script>var hashRoute = '/app/kibana';
var defaultRoute = '/app/kibana';
var hash = window.location.hash;
if (hash.length) {
window.location = hashRoute + hash;
} else {
window.location = defaultRoute;
}</script>
Configuring Logstash to accept IPFIX data¶
Configure an IPFIX input and output in logstash:
cat <<'EOF' > /etc/logstash/conf.d/ipfix.conf
input {
udp {
port => 2055
codec => netflow {
versions => 10
}
type => ipfix
}
}
output {
if [type] == "ipfix" {
elasticsearch {
hosts => "127.0.0.1"
index => "ipfix-%{+YYYY.MM.dd}"
}
}
}
EOF
See here for more information on the configuration options.
Restart logstash:
systemctl restart logstash.service
Set up a flow collector to send data to the IP of the server on port 2055, and enable it.
Optional: Translating IPFIX fields¶
Some of the raw fields don’t look very nice in Kibana, particularly protocols and transport ports, since they are expressed as simple numbers. Logstash allows for translation or creation of new fields as data comes in. Let’s add some translations for protocol names. We can also normalize IPv4 and IPv6 addresses into fields that can contain either to make searching and filtering easier.
First create a directory to store the protocol translations:
mkdir -p /etc/logstash/dictionaries
Create mappings for protocols and ports from the IANA database:
curl -s https://www.iana.org/assignments/protocol-numbers/protocol-numbers-1.csv \
| grep -e '^[0-9]\+,' | cut -d ',' -f1,2 | grep -ve ',$' | head -n-1 \
| sed -e 's/\([0-9]\+\),\(.*\)/"\1": \2/g' \
> /etc/logstash/dictionaries/iana-protocol-numbers.yml
curl -s https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv \
-o /tmp/service-names-port-numbers.csv
for protocol in tcp udp sctp dccp ; do
cut -d ',' -f1,2,3 /tmp/service-names-port-numbers.csv \
| grep -e "^[a-zA-Z0-9-]\+,[0-9]\+,$protocol" \
| sed -e 's/\([a-zA-Z0-9-]\+\),\([0-9]\+\),.*/"\2": \1/g' \
> /etc/logstash/dictionaries/iana-${protocol}-numbers.yml
done
rm /tmp/service-names-port-numbers.csv
Also create mappings for nDPI protocols:
curl -s https://raw.githubusercontent.com/ntop/nDPI/dev/src/include/ndpi_protocol_ids.h \
| grep -v NDPI_PROTOCOL_SIZE \
| grep -P '#define NDPI_(PROTOCOL|CONTENT).*[0-9]+' \
| sed -e 's|\/\*.*\*\/||g' | awk '{print "\"" $3 "\"" ": " $2 }' \
| uniq \
| sed -e 's/NDPI_PROTOCOL_//g' \
| sed -e 's/NDPI_CONTENT_//g'
> /etc/logstash/dictionaries/ndpi-application-ids.yml
Now add a filter section between the input and output sections in
/etc/logstash/conf.d/ipfix.conf:
input {
udp {
port => 2055
codec => netflow {
versions => 10
}
type => ipfix
}
}
filter {
# nDPI stuff
if [netflow][applicationId] {
ruby {
code => 'event.set("[netflow][application]", event.get("[netflow][applicationId]").rjust(4, "\x00").unpack("N")[0])'
}
translate {
dictionary_path => "/etc/logstash/dictionaries/ndpi-application-ids.yml"
field => "[netflow][application]"
destination => "[netflow][applicationName]"
fallback => "UNKNOWN"
}
ruby {
code => 'event.set("[netflow][applicationId]", [event.get("[netflow][applicationId]")].pack("m").strip())'
}
}
# Normalize addresses
if [netflow][ipVersion] == 4 {
mutate {
rename => {
"[netflow][sourceAddress]" => "[netflow][sourceIPv4Address]"
"[netflow][destinationAddress]" => "[netflow][destinationIPv4Address]"
}
}
}
if [netflow][ipVersion] == 6 {
mutate {
rename => {
"[netflow][sourceAddress]" => "[netflow][sourceIPv6Address]"
"[netflow][destinationAddress]" => "[netflow][destinationIPv6Address]"
}
}
}
# Add fields with descriptive protocol strings
if [netflow][protocolIdentifier] {
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-protocol-numbers.yml"
field => "[netflow][protocolIdentifier]"
destination => "[netflow][protocolName]"
fallback => "Unknown"
}
}
if [netflow][protocolIdentifier] == 6 {
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
field => "[netflow][sourceTransportPort]"
destination => "[netflow][sourceTransportPortName]"
fallback => "Unknown"
}
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-tcp-numbers.yml"
field => "[netflow][destinationTransportPort]"
destination => "[netflow][destinationTransportPortName]"
fallback => "Unknown"
}
}
if [netflow][protocolIdentifier] == 17 {
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
field => "[netflow][sourceTransportPort]"
destination => "[netflow][sourceTransportPortName]"
fallback => "Unknown"
}
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-udp-numbers.yml"
field => "[netflow][destinationTransportPort]"
destination => "[netflow][destinationTransportPortName]"
fallback => "Unknown"
}
}
if [netflow][protocolIdentifier] == 33 {
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
field => "[netflow][sourceTransportPort]"
destination => "[netflow][sourceTransportPortName]"
fallback => "Unknown"
}
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-dccp-numbers.yml"
field => "[netflow][destinationTransportPort]"
destination => "[netflow][destinationTransportPortName]"
fallback => "Unknown"
}
}
if [netflow][protocolIdentifier] == 132 {
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
field => "[netflow][sourceTransportPort]"
destination => "[netflow][sourceTransportPortName]"
fallback => "Unknown"
}
translate {
dictionary_path => "/etc/logstash/dictionaries/iana-sctp-numbers.yml"
field => "[netflow][destinationTransportPort]"
destination => "[netflow][destinationTransportPortName]"
fallback => "Unknown"
}
}
if [netflow][protocolIdentifier] in [6, 17, 33, 172] {
if [netflow][sourceTransportPortName] != "Unknown" {
mutate {
add_field => {
"[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]} (%{[netflow][sourceTransportPortName]})"
}
}
} else {
mutate {
add_field => {
"[netflow][sourceTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][sourceTransportPort]}"
}
}
}
if [netflow][destinationTransportPortName] != "Unknown" {
mutate {
add_field => {
"[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]} (%{[netflow][destinationTransportPortName]})"
}
}
} else {
mutate {
add_field => {
"[netflow][destinationTransportDescription]" => "%{[netflow][protocolName]}/%{[netflow][destinationTransportPort]}"
}
}
}
# Assume that the lower port number is the service port
if [netflow][sourceTransportPort] < [netflow][destinationTransportPort] {
mutate {
add_field => {
"[netflow][serviceDescription]" => "%{[netflow][sourceTransportDescription]}"
}
}
} else {
mutate {
add_field => {
"[netflow][serviceDescription]" => "%{[netflow][destinationTransportDescription]}"
}
}
}
} else {
if [netflow][protocolName] != "Unknown" {
mutate {
add_field => {
"[netflow][serviceDescription]" => "%{[netflow][protocolName]}"
}
}
} else {
mutate {
add_field => {
"[netflow][serviceDescription]" => "Unknown (%{[netflow][protocolIdentifier]})"
}
}
}
}
}
output {
if [type] == "ipfix" {
elasticsearch {
hosts => "127.0.0.1"
index => "ipfix-%{+YYYY.MM.dd}"
}
}
}
Then restart Logstash:
systemctl restart logstash
Additional filters may be added according to the desired fields. For example, it’s possible to generate a list of host-to-bond ID mappings from the bondingadmin API to populate a bond ID field, depending on the flow collector’s source IP policy.
Installing a frontend webserver¶
This is required to allow access to the Kibana dashboard from other hosts.
Install Nginx:
zypper in nginx
Edit /etc/nginx/nginx.conf and change the default server entry to proxy to Kibana:
server {
listen 80;
location / {
proxy_pass http://localhost:5601;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
}
}
Enable and start the service:
systemctl enable nginx.service
systemctl start nginx.service
Note that this allows access from anywhere without authentication. If a firewall is not in place, authentication should be set up in Nginx. See here for instructions.
Configuring Kibana to index the data¶
Navigate to the ip of the server in a web browser, which will show the Kibana welcome page. The first thing that needs to be set up is the index pattern for the data we are collecting.
Click the “Set up index patterns” button on the upper right. It will display
the index pattern creation page. If you are collecting data already, you will
see an index for the current date. We need to set up a pattern that will match
the indices for all dates, so set the index pattern to ipfix-* and click
the Next step button.
Now choose @timestamp for the Time Filter field name and click the
Create index pattern button to finish.
Contents of the data¶
Now you can navigate to the Discover panel to see the raw flow data that was captured. Clicking the arrow to the left of an entry will expand it, showing all of the fields recorded for the flow.
Here are the most important fields. Not all of them will be present in all records, depending on the actual flow type:
@timestampThis is the time when the flow was reported to Logstash.
hostThis is the IP address of the device that sent the flow record. This can be used to determine which bonder handled the flow, according to the Source IP policy parameter of the flow collector defined in Bondingadmin.
netflow.ipVersionThis is the IP version of the flow. It will be
4for IPv4 and6for IPv6.netflow.protocolIdentifierThe protocol ID of the flow. This will be
1for ICMP,6for TCP and17for UDP. The full list is available herenetflow.protocolNameThe name for the protocol ID of the flow (only present if logstash translation was implemented above)
netflow.sourceAddressThe source IPv4/IPv6 address of the flow (only present if logstash translation was implemented above)
netflow.sourceIPv4AddressThe source IPv4 address of the flow (not present if logstash translation was implemented above)
netflow.sourceIPv6AddressThe source IPv6 address of the flow (not present if logstash translation was implemented above)
netflow.destinationAddressThe destination IPv4/IPv6 address of the flow (only present if logstash translation was implemented above)
netflow.destinationIPv4AddressThe destination IPv4 address of the flow (not present if logstash translation was implemented above)
netflow.destinationIPv6AddressThe destination IPv6 address of the flow (not present if logstash translation was implemented above)
netflow.sourceTransportPortThe source port of the flow, if the flow protocol has port numbers. The TCP, UDP, and SCTP protocols have port numbers.
netflow.sourceTransportPortNameThe name for the value in
netflow.sourceTransportPort(only present if logstash translation was implemented above)netflow.sourceTransportDescriptionA full description of the values in
netflow.protocolNameandnetflow.sourceTransportPortthat is useful for display (only present if logstash translation was implemented above)netflow.destinationTransportPortThe source port of the flow, if the flow protocol has port numbers. The TCP, UDP, and SCTP protocols have port numbers.
netflow.destinationTransportPortNameThe name for the value in
netflow.destinationTransportPort(only present if logstash translation was implemented above)netflow.destinationTransportDescriptionA full description of the values in
netflow.protocolNameandnetflow.destinationTransportPortthat is useful for display (only present if logstash translation was implemented above)netflow.serviceDescriptionA full description of the the service including the protocol and port information. For port-based protocols, this is derived from the relevant
netflow.sourceTransportDescriptionornetflow.destinationTransportDescriptionvalue, assuming that the lowest of the two port numbers is the service port (only present if logstash translation was implemented above)netflow.tcpControlBitsIf the flow protocol is TCP, this will contain the TCP flags (control bits) for the packet. See the entry in this page for more information.
netflow.flowStartMillisecondsThe time the flow started.
netflow.flowEndMillisecondsThe time the flow ended.
netflow.octetDeltaCountThe number of bytes transferred in the flow since the last record.
netflow.packetDeltaCountThe number of packets transferred in the flow since the last record.
Some flows may contain other fields depending on configuration and protocol type. An official list of potential fields is available here but most of them are not available. Some of those fields, as well as some custom ones not defined by the IANA may be added in future releases of Bonding.
Creating visualizations¶
See here for a more complete guide on creating visualizations.
Top 10 hosts table¶
Let’s create a simple data table first. This will list the top 10 hosts in descending order of bytes transferred.
Create a Data Table visualization.
For Metric select the Sum aggregation of netflow.octetDeltaCount and
set the label to Bytes.
For Buckets select the``Terms`` aggregation of
netflow.destinationIPv4Address.keyword and order by the sum of
netflow.octetDeltaCount, descending, with a size of 10. Set the label to
Host.
Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.
Top 10 services chart¶
If the logstash translation is enabled there is a field that records the service regardless of the direction. We can use this to track the popular services on the network.
Create an Vertical Bar visualization.
For Y-Axis select the Sum of the netflow.octetDeltaCount field and
set the label to Bytes.
For the X-Axis select the Terms aggregation of
netflow.serviceDescription.keyword field and order by the sum of
netflow.octetDeltaCount, descending, with a size of 10. Set the label to
Service.
Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.
Service traffic over time¶
It is also useful to see which hosts transfer the most traffic at specific times. A time-based line chart will work best for this.
Create a Timelion visualization.
The default expression simply shows the number of flows, which is not too useful, so we need to create an expression that shows the services.
Change the Timelion Expression field to the following:
.es(index=ipfix-*, split=netflow.serviceDescription.keyword:10, metric=sum:netflow.octetDeltaCount, kibana=true).scale_interval(1s).fit(mode=scale).lines(width=1, fill=true, stack=true).if(operator="lt", if=0, then=0).label(regex='netflow.serviceDescription.keyword:(.+) >.*$', label="$1").yaxis(label="bytes/s", min=0)
See here for a better tutorial on creating Timelion charts.
Click the triangle button at the top of the form to preview the data and click Save at the top of the page to save the visualization.
Creating dashboards¶
Visualizations can be organised into dashboards for quick viewing of important data.
To create a dashboard, click the Dashboard menu item on the left and click the Create a dashboard button. On the resulting page, click the Add button at the top to add each of the visualizations created earlier.
Resize and rearrange the panels to your liking and click the Save button at the top to save the dashboard.