Dump converted data to a new measurement?

Hello everyone!

I am currently streaming raw data to my database under the measurement “raw_data” however, this data needs to be converted (multiplied, divided, etc). How can I preserve my raw data, but also have a measurement that consists of this raw data converted (Lets call it “converted_raw_data”). I am new to InfluxDB so all help is appreciated. I am thinking that I might have to use a continuous query? The only thing is, is that I don’t want to have to use an aggregate function. Thanks to all of those who reply in advance!

Hi @kyled

I think Kapacitor will do what you want to. CQ’s could work but in terms of the functions you can perform on your data Kapacitor has a bit more functionality. It also removes some of the strain on Influx caused by running CQ’s.

You could batch query your raw data and run your conversions, using the influxDBOut node you could then forward the converted data to the same measurment but with a different retention policy name.

Kapacitor as a CQ Engine

Thats how I tend to do this anyway.

Hi @philb

So I assume that Kapacitor is it’s own running daemon? (At least for influxdb 1.7)

Mind elaborating more on:

using the influxDBOut node you could then forward the converted data to the same measurment but with a different retention policy name.

Thanks,

Kyle

Sure thing @kyled

Correct, it runs as it’s own process. It’s generally used to trigger alerts but the scripts involved can be used for data manipulation. You can run them side by side on the same server.

The influxDBOut node takes the results of your query and conversion and forwards it to a database, measurement and retention policy of your choosing.

So based on the default settings when Influx is installed I’m guessing you have a database with a retention policy of “autogen” so you would be querying “database.measurement.autogen” for example.

In the InfluxDBOut node you could specify the following

|influxDBOut()
        .database('yourdatabase')
        .retentionPolicy('converted_data')
        .measurement('measurement')
        .precision('s')

So a basic example using the win_disk measurement might be like this. This query gets the mean value of the raw data before writing back to a new RP.

batch
    |query('SELECT mean("Free_Megabytes") AS "Free_Megabytes", mean("Percent_Free_Space") AS "Percent_Free_Space" FROM "yourdatabase"."autogen"."win_disk"')
        .period(5m)
        .every(5m)
        .groupBy(time(5m), *)
        
    |log()
    |influxDBOut()
        .database('yourdatabase')
        .retentionPolicy('converted_data')
        .measurement('win_disk')
        .precision('s')

The above will run every 5 minutes, grabbing 5 minutes of data and then group the ouputted data in 5 minute chunks, obviously the timings are optional and can be changed to suit your requirement.

It then returns the converted data back to the same database, the same measurement but with the retention policy of “converted_data” leaving your original raw data intact.

The database, measurement and RP you write the data back to are optional also so you could write it to a separate database and measurement if you wanted to. I write it back to the same database and measurement, with the same field names but in a different retention policy because it makes handling my dashboards easier. I only need to change the retention policy in my dashboard to query raw or historical data.

If you haven’t created a new retention policy in influx before then log into the Influx CLI and run the following

create retention policy "converted_data" on "yourdatabase" duration 30d replication 1 
show retention policies

^^^ this should show the default (autogen) and your new retention policy.

That creates a second RP on the database with a 30 day retention policy, which of course can be changed to 60d, 7d or however long you want. Replication 1 will suffice unless you are using a clustered InfluxDB environment.

Once the RP is created, you can modify the above query to fit your needs. Save that in a file called “convertdata.tick” and upload it to your server.

The following command will define the script for processing.

sudo kapacitor define convertdata -type batch -tick /path_to_tick_file -dbrp yourdatabase.autogen

Followed by

sudo kapacitor enable convertdata

Wait however long you set the query time/duration and check the results.

You can check in a couple of ways, if you are in the influx CLI you can query the data directly, but you will need to choose your new retention policy. So in the CLI it would be

use yourdatabase.converted_data 

You can jump between RP’s by using the same command and specifying your other RP.

Followed by

show measurements

If all has worked then you will see your measurement there waiting for you to query.

You can also check whether Kapacitor has executed the task succesfully with the following two commands

kapacitor show convertdata

This will output a digraph of the script showing how many points have been processed and any errors if they exist.

You can also run

kapacitor watch convertdata

This will show the query in action, as it runs.

If you’re unsure about working with different retention policies in Influx then it might be worth having a browse through here

Hope that helps, explanations aren’t my strong point but if you get stuck let me know.

Phil

Edit: Just to point out, i don’t think it is possible to mix different aggregations in the query but if you’re just wanting to perform divisions, multiplications then you can drop the aggregation part from the query.

2 Likes

Wow! Thank you so much for the elaborate response!

No problem!

Hope it helps.

Is there a certain directory that I should save each tick script in?

No not really, thats up to you. I use Kapacitor for alerts and for data manipulation so i create two folders within the /etc/kapacitor director giving me

/etc/kapacitor/tick (alert scripts)
/etc/kapacitor/downsample (data manipulation)

However you could just create a folder in your home directory to store them in. As long as you specify with the -tick flag when defining the script you should be good.

For the sake of testing, I’d just leave the script in your home dir for now. If you’re using ubuntu then you when you define the script you would specify the directory like so

-tick ~/script.tick

In full it would be

sudo kapacitor define convertdata -type batch -tick ~/scriptname.tick -dbrp yourdatabase.autogen

Overall the location is completely down to user choice.

1 Like

@philb

So I went ahead and created this script:

batch
        |query('SELECT ("Throttle_pos" * 100) FROM "vm_test_database"."autogen"."data"')
                .period(5m)
                .period(5s)
        |log()
        |influxDBOut()
                .database('vm_test_database')
                .measurement('converted_data')
                .precision('ms')

And than ran the following as root:

kapacitor define convertdata -type batch -tick convert_data_script.tick -dbrp vm_test_database.autogen

But I get this error:

must define one of 'every' or 'cron'

Have you experienced this issue before? When I try enabling it after that error, and then watching it I get nothing.

Kyle

EDIT: Nevermind… I fixed this… silly syntax error where .period() should have been .every()

Hi @kyled , yep thats the one. you should specify .every and .period in your queries otherwise Kapacitor won’t know how much data to query and for how long :slight_smile:

.cron, as the name suggests would run the task at a given time of day or week.

Glad you got it sorted!

Edit: as a point, if you want to keep the tags in your measurement you MUST specify them in the groupBy clause. It will only include the ones you specify. If you want to keep all tag values and insert them back in with your converted data then you can simply change it to group by *

So this

.groupBy(time(5m), *)

Would group into 5 minute chunks and retain all tags as they are.