I’m currently running OSS 2.0, which has migrated data from 1.x
Sometimes I need to move data between measurements, which was relatively easy in InfluxQL, but unfortunately the support is read-only, so SELECT INTO is not an option anymore.
Is there a way to merge measurement data using Flux?
Thanks for the links, the union part is clear, so I can query and merge the measurements I need.
To be more specific, by merge, I also mean to store the result not only union/join them on the fly
So the harder part for me is the InfluxQL INTO clause equivalent of Flux, which I couldn’t find in the docs.
Question in short: Can I use Flux to insert/store/write query results back into InfluxDB?
@kajarit Yes, you can. Query then data, update the measurement name, then write it back to InfluxDB with to()
. Queries in 2.x must be scoped by time, so you have to provide a time range. Depending on how much data is in the measurement, you may need to do this in time-based batches:
from(bucket: "example-bucket")
|> range(start: 2021-11-21T00:00:00Z, stop: 2021-11-22T00:00:00Z)
|> filter(fn: (r) => r._measurement == "old")
|> set(key: "_measurement", value: "new")
|> to(bucket: "example-bucket")
@scott
Thanks for your answer. That works very good for small measurements.
I have two influx Buckets (one from 1.8 and one new 2.2) and want to migrate one into the other. The measurements are from the same things (old smarthome server and new one).
If i use your script in chronograf with only one year for one measurement, chronograf does not copy data and shows me an error of too large segment.
Is there a way to copy the complete measurement into the other even if its large (1.000.000 datapoints)? I have 80 measurements to copy and it would be very frustrating to do it with mothly commands.
from(bucket: "openhab_old")
|> range(start: 2018-01-01T00:00:00Z, stop: 2019-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "Vitocal_Betriebsstunden_Belastung1")
|> set(key: "_measurement", value: "KGHeizungskellerVitocal300A_Belastung1Betriebsstunden")
|> set(key: "item", value: "KGHeizungskellerVitocal300A_Belastung1Betriebsstunden")
|> set(key: "category", value: "time")
|> set(key: "label", value: "Belastung1 Betriebsstunden")
|> set(key: "type", value: "Number:Time")
|> to(bucket: "openhab", org: "benner")
@dominikbenner Are you migrating from 1.8 to 2.2 or the other way around?
@scott wasnt clear about this.
Im already an 2.2 with both databases. So i have updated the 1.8 server to 2.2 and moved to a new Server.
I think i have used the wrong wording…
I have 2 buckets (openhab = new; openhab_old = old ). And now i want all old item measurements in my new bucket. For this i used the script above but this die not work for my amount of data.
I think with any method, you’re going to have to batch write the data into the new bucket. Another possibility is to export your data as line protocol and then write the line protocol in. You can use one of the following commands (depending on the InfluxDB version) to export data on disk as line protocol:
-
influxd_inspect export
(1.8) -
influxd inspect export-lp
(2.2)
With these, you don’t have to use the query engine to query the data out first. You can then write the exported line protocol to your new bucket.
This seems to be the best way!
I exported one measurement.
Do i have to bulk edit the file and change the old measurement name with the new one?
The old data is missing some tags, which are added in the new measurement.
Is there a way to add those tags (all static - so simple to define) to ALL values in one measurement ?
Hi @scott
I found your very helpfull snippet and actually facing an issue with migrating some of my measurements (historical data) to new measurements. It is working fine with it to copy the old data to the new measurement, but I additionally have a tag in the old measurements (e.g. name of the sensor) which I would like to change during copying to a fixed value, something like:
|> set(key: "item_name" = "new_name")
Perhaps this is a tag and not a key?
It should be the same value for all copied rows, actually the “item_name” will be copied also.
Thanks for your help
@waywit Your use case is actually a good one for experimental.set()
, which lets you staticly set values for multiple columns in a single pass. If the column exists, it will overwrite the existing value. Otherwise, it will add the column with the specified value:
import "experimental"
data = from(bucket: "example-bucket") |> // ...
data
|> experimental.set(o: {_measurement: "new-measurement-name", item_name: "new-name"})
|> to(bucket: "example-target-bucket")
Hi @scott
Thanks for this code example… just a short question on how to call it. I migrate the my old date with this statement:
from(bucket: "openhab/autogen")
|> range(start: 2022-09-19T00:00:00Z, stop: 2022-09-20T12:00:00Z)
|> filter(fn: (r) => r._measurement == "Old_Temperatur")
|> set(key: "_measurement", value: "New_Temperatur")
|> to(bucket: "openhab/autogen")
How do I integrate the new snippet with the “data” statement to ist… or can I call it after the migration to the new value? to stay in the example from above and the tag name which should be overwritten is “item” and the new value is “Temperatur”:
import "experimental"
data = from(bucket: "openhab/autogen")
|> data
|> experimental.set(o: {_measurement: "New_Temperatur", item: "Temperatur"})
|> to(bucket: "openhab/autogen")
Thank you very much for your help…
@waywit The data
identifier is just a placeholder representing any piped-forward data. In your query, you’d replace the existing set
function with the experimental set
function:
import "experimental"
from(bucket: "openhab/autogen")
|> range(start: 2022-09-19T00:00:00Z, stop: 2022-09-20T12:00:00Z)
|> filter(fn: (r) => r._measurement == "Old_Temperatur")
|> experimental.set(o: {_measurement: "New_Temperatur", item: "Temperatur"})
|> to(bucket: "openhab/autogen")
@scott Thank you very much… that worked great. Now I have my old data in the old and new measurements, is it possible to delete/drop the whole measurement?
@waywit to delete a measurement, you have to use the influx
CLI or the InfluxDB /api/v2/delete
API endpoint. You can find specific instructions here: Delete data | InfluxDB OSS 2.4 Documentation
I have to warm up this post again.
Unfortunately, not all fields are copied for me.
I am using Influxdb 2.7.0
I have this Fields in measurement “Solarkraftwerk”
totalstarttime
totalkwh
yesterdaykwh
todaykwh
frequenz
watt
voltage
ampere
factor
scheinleistung
blindleistung
totalPmeter
wattPmeter
If i copy the measurement to a new one only this Fields are copied:
ampere
blindleistung
factor
frequenz
scheinleistung
todaykwh
totalPmeter
totalkwh
Update: I have made it with copy all single _fields manually (modified script from Scott)
from(bucket: "openhab/autogen")
|> range(start: 2022-09-19T00:00:00Z, stop: 2022-09-20T12:00:00Z)
|> filter(fn: (r) => r._measurement == "Old_Temperatur")
|> filter(fn: (r) => r._field == "Old_Field")
|> experimental.set(o: {_measurement: "New_Temperatur", item: "Temperatur"})
|> to(bucket: "openhab/autogen")