Merge 3 different input sources into one stream

Hi,

I need to merge the following 3 datasources ( GPS data → see code example ) into 1 output stream ( Join function? ), however i’m failing in getting the right query unfortunately.

The data needs to be merged to always have a ‘single’ Latitude/Longitude output ( for plotting purposes ) despite one of the datasources could be empty.

My starting point:

data = from(bucket: "kgnv_rd") 
    |> range(start: v.timeRangeStart, stop:v.timeRangeStop)

// Datasource 1 
nmeaGNSS = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_GNSS_Latitude" or r._field == "NMEA2000_GNSS_Longitude" ))
    //|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

// Datasource 2 
nmeaHighfreq = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_PosLatitude" or r._field == "NMEA2000_PosLongitude" ))
   // |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 

// Datasource 3
standard = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "Latitude" or r._field == "Longitude" ))
   // |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



|> yield()

data for example looks like ( only 2 sources available in dataset in this case ):

from(bucket: "kgnv_rd") 
    |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and ( r._field == "NMEA2000_GNSS_Latitude" or r._field == "NMEA2000_GNSS_Longitude" or r._field == "NMEA2000_PosLongitude" or r._field == "NMEA2000_PosLatitude"or r._field == "Latitude" or r._field == "Longitude" ))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

|> yield()

How could I achieve this? Hope someone can help me out in the right direction :slight_smile:

Hi @bart1992,
Have you tried using a join()? I don’t think it will be too pretty but look something like this:

nmeaGNSS = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_GNSS_Latitude" or r._field == "NMEA2000_GNSS_Longitude" ))
   |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")


// Datasource 2 
nmeaHighfreq = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_PosLatitude" or r._field == "NMEA2000_PosLongitude" ))
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 


// Datasource 3
standard = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "Latitude" or r._field == "Longitude" ))
   |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

  tmp = join(
    tables: {nmeaGNSS: nmeaGNSS, nmeaHighfreq: nmeaHighfreq},
    on: ["_time"],
)

  join(tables: {tmp: tmp, standard: standard}, on: ["_time"])
   |> yield(name: "combined")

Hi @Jay_Clifford,

Thanks for your response! I’ve tried your solution, but unfortunately it does not work. Since im not able to get it to work, i’ve started with only 2 data sources but also with only 2 sources i’m unable to join/merge the data.

The influxDB server keeps on crashing when performing the join function like :

data = from(bucket: "kgnv_rd") 
    |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
//    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_GNSS_Latitude"  or r._field == "NMEA2000_GNSS_Longitude" or r._field == "Latitude" or r._field == "Longitude" ))

// Datasource 1 
nmeaGNSS = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "NMEA2000_GNSS_Latitude" or r._field == "NMEA2000_GNSS_Longitude" ))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
// Datasource 2
standard = data
    |> filter(fn: (r) => r._measurement == "${DeviceID}" and (r._field == "Latitude" or r._field == "Longitude" ))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



join(tables: {nmeaGNSS: nmeaGNSS, standard: standard}, on: ["_time"])

|> yield()

error:

The joining on [_time] is probably causing issues, since the data is not exactly alligned in this time frame. Putting this function before the join does not seem to help :

|> truncateTimeColumn(unit: 10s)

|> truncateTimeColumn(unit: 10s)
join(tables: {nmeaGNSS: nmeaGNSS, standard: standard}, on: ["_time"])

|> yield()

Hope you have another solution/query which I can try?

Thanks,