Heka

What is Heka?

Heka is the quintessential “Swiss Army Knife” tool for processing streams of data and routing them to one or more destinations. Written in Go and released as open source by engineers at Mozilla, Heka is a high performance tool supporting a wide variety of tasks and workflows. Its primary focus is on data gathering, analysis, monitoring, and reporting, although our primary focus is using it to collect and forward metrics to our Librato account.

How does Heka work?

Heka uses a plugin system to load and process streams of data at various stages of its “pipeline”. Inputs provide an ingress point for data to enter the system. Decoders convert the native data into Heka’s internal message data structure so that downstream handlers will understand how to interact with the data. Filters are the core processing units within Heka, performing useful analytics or aggregation on the data before routing it forward. Encoders act as the inverse of decoders, serializing the internal data structures into the preferred format for the intended recipient. And finally, Outputs handle delivery of the payload to their final destination.

heka

In the sections below we’re going to preview what it might look like to process some data that we want to store in Librato. Specifically, we’ll look to track HTTP requests through an NGINX webserver. For this exercise we’ll need to be able to read from the NGINX logs, translate the log entries into something usable as a metric, and then transmit it over to our Librato account.

Installing Heka

Binary deb and rpm packages are available for each release and are the easiest way to get started. Tarballs distributions are also available, or you can install from source.

$ wget https://github.com/mozilla-services/heka/releases/download/v0.8.2/heka_0.8.2_amd64.deb
$ sudo dpkg -i heka_0.8.2_amd64.deb

When your installation is complete you should have a number of new files: Lua plugins (mostly encoders and decoders), some dashboard assets, and a handful of binaries. The single most important file is the hekad daemon, which loads and orchestrates the various plugins specified in your configuration. Heka uses the TOML markup language for its configuration syntax, which is functionally similar to the INI format. By default, hekad looks for a configuration stored at /etc/hekad.toml, although you can override this with the –config parameter. You can get started by creating a new empty TOML file and adding configuration blocks for each plugin type needed to process your stream data. We’ll look at this in more detail in the following sections.

Configuring Plugins

Inputs

Input plugins receive data from external systems so they can begin processing within the Heka pipeline. Heka supports a variety of input mechanisms such as: reading from files, listening on a TCP socket, accepting log streams, pulling messages from AMQP, launching external processes, or even connecting to remote HTTP services. Virtually anything can serve as a Heka input, although the plugin must be written in Go.

The following configuration snippet instructs hekad to use the LogstreamerInput plugin and passes a number of directives to the plugin. In particular, it directs the plugin to load NGINX’s access.log files and specifies the decoder (“nginx-access-decoder”) where the stream should be routed.

[nginx-access-logs]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'access\.log\.?(?P<Index>\d+)?(.gz)?'
priority = ["^Index"]
decoder = "nginx-access-decoder"

Decoders

Heka uses decoders to convert the source data (as imported by the Input plugin) into a native data structure that Filters will be able to understand.

Decoders deserialize the native stream into Heka’s internal message data structure so that downstream handlers can work with it. These plugins can be written in Go or as sandboxed Lua code. Lua makes it very easy to prototype (it doesn’t require recompilation of Heka), so many of the community-contributed plugins will be in this format.

In this snippet we tell Heka to load the SandboxDecoder plugin and the Nginx Access Log Decoder. There is also a nested configuration block that is passed to the Lua sandbox, specifying the log format and identifying each of the fields in the stream. Once the data has been decoded it is effectively “in the system” and ready for processing by any of the various Heka Filter plugins.

[nginx-access-decoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"
    [nginx-access-decoder.config]
    log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'
    type = "nginx.access"

Filters

Filters do most of the heavy lifting within Heka. They’re useful for performing aggregation, monitoring, or even processing new metrics to be reinjected into the Heka pipeline. Like encoders, filters can be written in Go or as sandboxed Lua modules.

For this example, we want to operate on the NGINX log streams that have been processed by our Input and Decoder plugins. We’re going to use the HTTP Status sandbox filter to match any messages with a Type of “nginx.access” (as set in our decoder). This will aggregate the $status field that we identified in the decoder configuration and flush all status counts every 5 seconds.

[http_status]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 5
preserve_data = true
message_matcher = "Type == 'nginx.access'"
    [http_status.config]
    sec_per_row = 5
    rows = 17280
    preservation_version = 0

Encoders

Encoder plugins serialize Heka’s internal data structures back into a format appropriate for the designated output. These can also be written in either native Go code or as sandboxed Lua.

Here we want to choose the CBUF Librato Encoder, which will extract our metrics from the circular buffer and serialize them into JSON appropriate for the Librato API. There isn’t much involved in the configuration here other than loading the proper encoder and making sure preserve_data is enabled, which will help to avoid duplicate messages in case Heka is restarted.

[cbuf_librato_encoder]
type = "SandboxEncoder"
filename = "lua_encoders/cbuf_librato.lua"
preserve_data = true

Outputs

Last but not least, output plugins are responsible for the final delivery of our byte stream to its intended destination. We can use them to write data out to files, send them to remote network sockets, or in our specific example, for sending our NGINX status code measurements to Librato. These plugins must be written in Go.

We’ll need to use the HttpOutput plugin here. Our configuration specifies which messages we want to match (type == heka.sandbox-output and payload_type == cbuf) from the “cbuf_librato_encoder” encoder. The configuration also provides the remote address (URL) of the HTTP service, as well as our username and password (token) credentials. Lastly, we also need to specify the Content-Type header as application/json.

[librato]
type = "HttpOutput"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'cbuf'"
encoder = "cbuf_librato_encoder"
address = "https://metrics-api.librato.com/v1/metrics"
username = "jdixon@librato.com"
password = "f48c781b49f43b8fe991e004978b9717abd8e065b9656abeb476dc3ac44480fe"
    [librato.headers]
    Content-Type = ["application/json"]

Putting It Together

All of the previous snippets should be concatenated into a single file. Our final configuration would look something like this:

[nginx-access-logs]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'access\.log\.?(?P<Index>\d+)?(.gz)?'
priority = ["^Index"]
decoder = "nginx-access-decoder"

[nginx-access-decoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"
    [nginx-access-decoder.config]
    log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'
    type = "nginx.access"

[http_status]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 5
preserve_data = true
message_matcher = "Type == 'nginx.access'"
    [http_status.config]
    sec_per_row = 5
    rows = 17280
    preservation_version = 0

[cbuf_librato_encoder]
type = "SandboxEncoder"
filename = "lua_encoders/cbuf_librato.lua"
preserve_data = true

[librato]
type = "HttpOutput"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'cbuf'"
encoder = "cbuf_librato_encoder"
address = "https://metrics-api.librato.com/v1/metrics"
username = "jdixon@librato.com"
password = "f48c781b49f43b8fe991e004978b9717abd8e065b9656abeb476dc3ac44480fe"
    [librato.headers]
    Content-Type = ["application/json"]

After saving the file, Heka can be started manually and you should see output for each of the plugins.

$ sudo hekad --config=/etc/heka/hekad.toml
2014/12/29 15:56:24 Pre-loading: [nginx-access-logs]
2014/12/29 15:56:24 Pre-loading: [nginx-access-decoder]
2014/12/29 15:56:24 Pre-loading: [http_status]
2014/12/29 15:56:24 Pre-loading: [cbuf_librato_encoder]
2014/12/29 15:56:24 Pre-loading: [librato]
2014/12/29 15:56:24 Pre-loading: [ProtobufDecoder]
2014/12/29 15:56:24 Pre-loading: [ProtobufEncoder]
2014/12/29 15:56:24 Loading: [nginx-access-decoder]
2014/12/29 15:56:24 Loading: [ProtobufDecoder]
2014/12/29 15:56:24 Loading: [cbuf_librato_encoder]
2014/12/29 15:56:24 Loading: [ProtobufEncoder]
2014/12/29 15:56:24 Loading: [nginx-access-logs]
2014/12/29 15:56:24 Loading: [http_status]
2014/12/29 15:56:24 Loading: [librato]
2014/12/29 15:56:24 Starting hekad...
2014/12/29 15:56:24 Output started:librato
2014/12/29 15:56:24 Filter started:http_status
2014/12/29 15:56:24 MessageRouter started.
2014/12/29 15:56:24 Input started: nginx-access-logs

If you’re using the same five-second Filter settings as above, you’ll want to set your Period to match in the Librato Metric Attributes.

default_metric_attributes_5s

Assuming you have live data hitting your NGINX server, you should begin seeing metrics in your Librato account.

Final Notes

Gauges Only

Heka’s Librato encoder only supports gauge metrics so if you’re trying to capture counter values make sure to use the integrate() function with our Composite Metrics.

Flush Interval

As I mentioned earlier, the ticker_interval in your encoder will define the flush interval within Heka’s circular buffer. It’s important that this matches up with your metric’s Period attribute or you’ll end up with gaps in your data.