Originally published at: https://www.influxdata.com/blog/enriching-your-data-with-kapacitor/
From time to time, we have seen requests from the community around querying InfluxDB based on a business period such as the typical business day or broken down into shift periods. Consider the following request: How do I summarize my data for the entire month of August just for business hours, defined by Monday through Friday between 0800AM and 0500PM? InfluxQL does not currently possess any functions for filtering based on time. We are limited to:
So how do we accomplish this? The provided Telegraf plugins typically just send a timestamp value, and also have the ability to send some static tags in addition to the metrics and tags associated with the configured plugin. The solution is to use Kapacitor as a “pre-processor” to “decorate” or enrich your data with a computed value that represents a time period that you desire to query.SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’;
For the purposes of this article we are running on
localhost for Telegraf, InfluxDB, and Kapacitor, but in a full-fledged environment these will be running on different hosts.
The first step is to configure Telegraf to write to Kapacitor instead of directly to InfluxDB. In the
[[outputs.influxdb]] section of your
telegraf.conf file, there are 3 key settings to consider:
The urls parameter must point to port 9092 (Kapacitor’s default listening port) instead of port 8086 for InfluxDB. The database parameter should point to a non-existent database (you can ignore Telegraf’s warning about the database not being found). The[[outputs.influxdb]] urls = [http://localhost:9092] database = “kap_telegraf” retention_policy = “autogen”
retention_policyparameter should be set to “autogen” or a specific retention policy that you have previously created in your instance.
retention_policy set to “” (default) is not the same as “autogen” which is specified as the default retention policy when InfluxDB is initialized.
All other settings in
telegraf.conf can be configured normally for your instance.
The next step is to create a TICKscript that will process the data coming from Telegraf. In this example, we are interested in creating a tag that will contain a true or false value if the data point is during business hours that we described above.
In this TICKscript, we are streaming from the non-existent database “kap_telegraf” that we configured above in ourstream |from() .database('kap_telegraf') |eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false')) .as('business_hours') .tags('business_hours') .keep() |delete() .field('business_hours') |influxDBOut() .database('telegraf') .retentionPolicy('autogen') .tag('kapacitor_augmented','true')
.from()method for the
stream()node only needs the database to match to. We then pass control to an
eval()node that will evaluate if the point has arrived in the window that we have designed. In this case, we utilize the
minute()functions described at http://docs.influxdata.com/kapacitor/v1.3/tick/expr/#time-functions to evaluate the “time” value. The first part of the condition evaluates whether the day of the week falls between Monday (1) and Friday (5). The second part of the condition evaluates the hour value, being careful to consider that we want to stop at the “00” mark in hour 17 (5pm). To do that, we are multiplying the hour by 100 and adding the result of the
minute()function to compare to the ending time of 1700. If the
timevalue falls within this range, the
eval()node returns true as a field called
business_hours. However, since we want to query on this value, we should have it as a tag, so we chain a
.tags()method to change the value to a tag called
eval() node will eliminate all other fields and tags from the stream, so we want to specify
.keep() to retain these values from the stream.
At this point, we have both a field and a tag called
business_hours that contains the output of the
eval() node. We should filter this out of the stream by calling a
delete() node that specifies the field to be deleted via the
Finally, we pass control in the stream to an
influxDBOut() node that specifies the destination database and retention policy to write to. We have added an additional static tag for this example called
kapacitor-augmented. All other data like measurement name is carried through, and it is only necessary to provide new information. In this case, we are going to write to the
telegraf database and the
autogen retention policy.
Once you have created the TICKscript (referenced below as
businesshours.tick), we must tell Kapacitor that we want to run it. For this example, we will utilize the Kapacitor CLI to configure the task.
This command defines the task called$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick
business_hoursas a stream type listening for writes to
kap_telegraf.autogenwhich is the database name and retention policy name that was configured in Telegraf above. Once we have successfully created the task, we need to enable it for processing by Kapacitor.
To show the status, we can ask Kapacitor to list the current tasks:$ kapacitor enable business_hours
Once data begins the flow through Kapacitor to InfluxDB, you can then add your condition$ kapacitor list tasks ID Type Status Executing Databases and Retention Policies business_hours stream enabled true ["kap_telegraf"."autogen"]
AND business_hours=’true’to the first query we specified:
In summary, we have shown a simple example of how to use Kapacitor to enrich your data coming from Telegraf into InfluxDB. This method could be used to add other types of data, potentially obviating the need to create custom Telegraf plugins to meet your business needs.SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’ AND business_hours=’true’;