Joining streams

Hello,

Have the following data sets:

use m0play
Using database m0play
show measurements
name: measurements
name
----
m0reqs
s3reqs
select * from m0reqs limit 10;
name: m0reqs
time                m0_id m0_type s3_id
----                ----- ------- -----
1565020080860999936 10999 MDW     999
1565020080861999872 10998 MDR     998
1565020080862999808 10997 IOW     997
1565020080864000000 10996 IOR     996
1565020080864999936 10995 MDW     995
1565020080865999872 10994 MDR     994
1565020080867000064 10993 IOW     993
1565020080868000000 10992 IOR     992
1565020080868999936 10991 MDW     991
1565020080869999872 10990 MDR     990
select * from s3reqs limit 10;
name: s3reqs
time                s3_id s3_type
----                ----- -------
1565020080860999936 999   GET
1565020080861999872 998   DEL
1565020080862999808 997   PUT
1565020080864000000 996   GET
1565020080864999936 995   DEL
1565020080865999872 994   PUT
1565020080867000064 993   GET
1565020080868000000 992   DEL
1565020080868999936 991   PUT
1565020080869999872 990   GET

Trying to join them by s3_id field with the code below so, Iā€™m expecting to see a table like this:

s3_id = 
from(bucket: "m0play")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "s3reqs")
                       
m0_id = 
from(bucket: "m0play")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "m0reqs")

join(tables:{m0_id:m0_id, s3_id:s3_id}, on:["_time", "s3_id"])

Data:

m0reqs_time         m0reqs_m0_id m0reqs_m0_type m0reqs_s3_id     s3reqs_time                s3reqs_s3_id s3reqs_s3_type                                         
----                -----        -------        -----            ----                       -----        -------                                                
1565020080860999936 10999        MDW            999              1565020080860999936        999   GET                                                           
1565020080860999936 20999        MDW            999              1565020080860999936        999   GET                                                           

Instead, I have got something very similar to ā€œfull joinā€.
Tried to analyse raw data with python. have the following header for joined table:

['',
 'result',
 'table',
 '_field_m0_id',
 '_field_s3_id',
 '_measurement_m0_id',
 '_measurement_s3_id',
 '_start_m0_id',
 '_start_s3_id',
 '_stop_m0_id',
 '_stop_s3_id',
 '_time',
 '_value_m0_id',
 '_value_s3_id',
 'm0_type',
 's3_type']

Inside the data, I see that ā€˜_value_m0_idā€™, ā€˜_value_s3_idā€™ values are mixed, because m0_id have to be in [0,100] range and s3_id in [10000,10100] range.

Tried to play with third parameter of join, ā€˜methodā€™, but didnā€™t obtain any data due to ā€˜invalid join typeā€™ error.

Found a workaround, looks like it works fine:

join(tables:{m0_id:m0_id, s3_id:s3_id}, on:["_time", "s3_id"])
  |> filter(fn: (r) => r._field_m0_id == "m0_id")
  |>  map(fn: (r) => ({ _time: r._time, 
                        _value_m0_id: r._value_m0_id,
                        _value_s3_id: r._value_s3_id,
                        m0_type: r.m0_type,
                        s3_type: r.s3_type }))

But, anyway, whatā€™s the right way to join streams?

BTW, when I put the code without map-expression in the end, chronograf canā€™t do visualization.

1 Like

Hello @Anatoliy_Bilenko,

I believe you want to just join on ā€œ_timeā€ or just ā€œs3_idā€ since your timestamps are the same for both datasets. You can always drop columns to make things cleaner.

I can recommend reading this blog to see some examples of joins. It is hardest to perform joins smoothly when the timestamps vary between datasets (in that case you need to apply some aggregate window)

If you still need more help/if removing one of the join parameters doesnā€™t work, please share a sample dataset (like the one above) in line protocol and Iā€™ll be happy to figure out the appropriate query.