Copy data across buckets using flux

Hey all, wanted to check with anyone has come across the below use case and is familiar with the solution for the same
I need to copy data from one influx bucket to another one and also filtering by a particular field value(not tag value)
i thought something like the below would work

from(bucket: "source")
  |> range(start: 2023-04-10T10:00:00Z, stop: 2023-04-10T11:00:00Z)
  |> filter(fn:(r) => r._measurement == "sample_measurememnt" and r._field == "sample_field" and r._value == "sample_value")
|> to(bucket: "destination")

but this seems to copy only the field that we’re filtering on (“sample_field”) to the destination bucket
is anyone aware of the changes that needs to be to the above to copy all the field values. I’m looking for a solution which works in either v2.0 or v2.3

Hello @vishak,
Welcome!
Have you tried:

from(bucket: "source")
  |> range(start: 2023-04-10T10:00:00Z, stop: 2023-04-10T11:00:00Z)
  |> filter(fn:(r) => r._measurement == "sample_measurememnt") 
|> to(bucket: "destination")

?
Alternatively if your’e using OSS you could do the following:

It’s still available for OSS → OSS if you have any interest in replicating to another OSS instance.

hey @Anaisdg , thanks for the response, but the query you had mentioned copies all rows from the source bucket to the destination bucket.
But I need to copy only those rows which match a certain filter criteria.
And the filter criteria is a particular field value(and not a tag value). I understand that filtering on a tag value is pretty straightforward, but not sure how I can filter on the field value.

filtering by field value is exactly the same:

|> filter(fn: (r) => r[“_value”] == valueToFilter)

@fercasjr in the above filter statement, how do we specify the field name?
And my expected output should be all the rows with all columns (tags + fields) which match the filter criteria.

like this:

from(bucket: "Bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_field"] == "Enable_run") //this is the _field
  |> filter(fn: (r) => r["_value"] == 0) //this is the value
  |> filter(fn: (r) => r["name"] == "DEMO") ///this is a tag
  |> to(bucket: "destination")

I recommend first filter by _field then by value and then by tag or measurement or any other thing that you need. Each time you use a filter you keep only the filtered data, so it is faster if you filter.
_field first (It also kind of depends on your data scheme in general, your first filter should be the one that narrows your data the most).

this solution will work with one field at the time, if you want to do multiple fields then it gets tricky.
you need to use

|> filter(fn: (r) => r["_field"] == "field1" or r["_field"] == "field2")

then you can not use _value as a filter so you will need to do a pivot first:

 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

then

  |> filter(fn: (r) =>  r["field1"] == "valuetofilter")
  |> filter(fn: (r) =>  r["field2"] == "valuetofilter")

and at the end use experimental.to/wideTo() instead of to() , you may need some regrouping there is information in the documentation experimental.to() function | Flux 0.x Documentation (influxdata.com)

|> wideTo(bucket: "example-target-bucket")

but It is easier to do just one field at the time.

from(bucket: "Bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_field"] == "Enable_run") //this is the _field
  |> filter(fn: (r) => r["_value"] == 0) //this is the value
  |> filter(fn: (r) => r["name"] == "DEMO") ///this is a tag
  |> to(bucket: "destination")

The above query works if we are to just copy one field to the destination bucket. Also we cannot execute the above query for every field, on at a time, since the filter condition is valid for a particular field.

that’s why I added the pivot () suggestion in case you need to work with more than one field at the time.

I cannot give you a specific query because it depends on your data schema.

but I can explain how pivot works.

so, each time you query on InfluxDB you get a “table stream” with the results.

If you have different tags with the same “_field” each tag result will be an independent data table stream, so you can filter on “_value” at all at the same time, but if you want to query many “fields” you will be needing to use an “or” condition within the filter function for each field; at that point you will have a subset of data tables per “field”, as you noticed you cannot filter by value because each value may be different depending on the field.

you may also need to drop some tags if that makes your data stream not “pivotable.” but again It depends on your data schema.

whit pivot() you get all each one of those fields as columns in your data stream, named as the field, so now you can filter like this:

filter all rows with field1=valuetofilter , field2=valuetofilter2, etc.

the problem is that now the data does not have a “field” and a “value” thats why you can not use to, and instead you will be needing to use experimental.to(), experimental.to takes the names of the columns and convert that to a “field” and the value to the “value” column.

other way to do the same is using unpivot() and now you can use the regular to() function. but unpivot is available in a relative recent version of Influx DB and I don’t know the one that you are using.