Join big measurements together - Error Bad Gateway

Hello Community,

I have 2 big measurements, one with GPS data and the other with particulate matter (pm10 and pm25) values. I want to join the longitude and latitude fields to the measurement with the sensor data to show them later on a map. Following Flux Query is working when I just use data of one day, means I seperated one day from the big measurement to a small one therefore, didn’t use the whole file. But with more than 100 different measurements this would be too much work overload and I hoped that I can just join 2 big files as well.

Maybe the FLUX codes is to complex and needs to much performance?
I appreciate any help. thx

data_1 = from(bucket: “gpx”)
|> range(start: 2020-06-08T00:00:00Z, stop: 2020-06-20T23:59:00Z)
|> truncateTimeColumn(unit: 1s)
|> filter(fn: ® => r._measurement == “gpx_all” and (r._field == “latitude” or r._field ==“longitude”))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
|> drop(columns:["_start", “_stop”, “_measurement”, “position”])

data_2 = from(bucket:“vienna_01”)
|> range(start: 2020-06-08T00:00:00Z, stop: 2020-06-20T23:59:00Z)
|> truncateTimeColumn(unit: 1s)
|> filter(fn: ® =>
r._measurement == “sds011_all” and (r._field == “pm10” or r._field ==“pm25”))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
|> drop(columns:["_start", “_stop”, “_measurement”, “position”])

join(tables: {d1: data_1, d2: data_2}, on: ["_time"])
|> map(fn: ® => ({
r with
_time: r._time
})
)

Hello @knalla66,
You might want to try https://v2.docs.influxdata.com/v2.0/reference/flux/stdlib/experimental/join/

Im still learning , but I think the way your data is structured currently would require the whole thing in memory for the pivot and join operations to work.

Few other ideas

  • refactor the data on processing/import so that the gps data is in the same bucket and measurement under new field names
  • process the two existing buckets incrementally and save the output into a 3rd bucket (see tasks under influxdb2)
  • you could enrich the data be adding the gps data into the existing data bucket, influxdb assumes a union operation if it gets two data rows with the same tag set and effectively merges them

thx for the suggestion - looked good and easy but also with experimental it does not work.

could it be that experimental is only working on influxdb 2.0?
I still have influxdb 1.8.1 in us.

I used the pivot function to get two fields instead of one (latitude and longitude for example) at the same time in the query. Didn’t find any other way - mostly there is only 1 field selected in the filter when I looked up possibilities. I am also a new user and maybe there are other ways.
I am using InfluxDB 1.8.1

Thx for your ideas!
I also thought about those possibilities but didn’t find a solution yet.

  • Your first thought is not possible anymore - the data is already processed and I need to analyse it - also GPX data was gathered with an Smartphone and seperatly uploaded with an Python script afterwards.

  • I tried to make this happen, but when I use the to() function in Chronograf it tells me that “Error calling function “to”: “to” is not implemented.” I used it after the join in the first post but also at the simple Query:
    from(bucket: “gpx”)
    |> range(start: 2020-06-08T00:00:00Z, stop: 2020-06-10T23:59:00Z)
    |> truncateTimeColumn(unit: 1s)
    |> filter(fn: ® => r._measurement == “gpx_all” and (r._field == “latitude” or r._field ==“longitude”))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
    |> drop(columns:["_start", “_stop”, “_measurement”, “position”])
    |> to(bucket: “vienna_01_downsampled”)

  • The last point I didn’t get. I can get the gps data in the vienna_01 bucket (or database) and than?

Thx for your help!

@knalla66 Flux in InfluxDB 1.x is read-only, so you aren’t able to write data back to InfluxDB with Flux (in 1.x). @FixTestRepeat is correct. Flux has to pull the entire queried dataset into memory to perform the pivot and the join, both of which are very expensive operations. With the nature of your data and the operation you’re trying to perform, I don’t know that the query will be able to complete on large datasets.

If you were using InfluxDB Cloud or InfluxDB 2.0 OSS, I’d suggest creating a Flux task that performs this operation on new data at regular intervals and stores the transformed data. I don’t know that there is a feasible solution with InfluxDB 1.8.1.

1 Like

ok, thank you for your help.
I will bear in mind for future time series to make tasks and pre-transform the data before storing them.

For the available data, is there a possibility to transfer the data from influxDB 1.8.1 to the new InfluxDB 2.0 and than transform it?

Nathaniel on the Flux team here. I wanted to chime in with some of my thoughts. First off the advice you have received so far is spot on. Joins in Flux currently require that the whole data fit in memory. That said the experimental.join function that is available in InfluxDB 2.0 is significantly more efficient with how it uses memory to do the join and in some cases can have a 10x smaller memory footprint. I would recommend trying out InfluxDB 2.0 with the experimental.join function and see if the performance gains are enough for your workload.

For the available data, is there a possibility to transfer the data from influxDB 1.8.1 to the new InfluxDB 2.0 and than transform it?

Writing data to InfluxDB 2.0 is almost the same as writing to InfluxDB 1.8.1, the only difference is how you authenticate. You could test out InfluxDB 2.0 by rewriting the data to InfluxDB 2.0 along side writing the data to 1.8. We also are working on way to make the migration easier in the near future.

Hope that helps.

Thanks for the information and your advice nathaniel.
Does the “drop” function reduce the data for the join? I guess not because the data has to be first in memory before the drop function applies, right?

Are there any documentations for 1.8.1 migration to 2.0?

Thanks

We haven’t focused on this yet. But we are working on these as part of of the InfluxDB 2.0 OSS GA…

Our guidance at this point is to dual-write data to both InfluxDB 1.x and 2.x. However, we mentioned that there is a big change coming as of InfluxDB 2.0 Beta 17 – which will re-align the storage engine with what you are using in 1.x. This should mean a more “upgrade in place” kind of experience. So, if you wait until that releases, that would be a good place to start. We also asked community members who were willing to help us test the various upgrade tools and processes. If you are interested…please let us know!

Hi @knalla66, my name is Adrian, I’m on the flux team and wanted to follow up.

Where you able to migrate your data to 2.0 and try experimental.join?

Thanks!

Adrian

thanks for your offer and sorry for my late reply.
In my particular case, I had a set of measurements which were already stored in InfluxDB and had do analyze. So the dual-write, or then at InfluxDB 2.0 only, is of course the next variant I choose for future work, but I have to wait for the next project. Otherwise, I would be glad to help, I will let you know. Thanks

Hi Adrian!
No, I didn’t migrate yet. I made a quick workaround: I exported the involved measurements as CSV (via Chronograf), wrote python scripts for the merging process and uploaded the merged file to the database again. This was my solution so far to handle the “join” of two big measurements based on timestamp column.

Greetings
Markus