Help with Kapacitor Join

Hi,

I’m relatively new to InfluxDB and Kapacitor and am trying to do a join.

I’ve made a recording that has the following data:

time                exchange price_point
----                -------- -----------
1499751043353536013 gdax     200.23
1499751045900187027 gdax     200.25
1499751061022148516 gdax     200.23
1499751061029442501 gdax     200.22
1499751061032533715 gdax     200.22
1499751061035878181 gdax     200.21
1499751061055424257 gdax     200.2
1499751061058481281 gdax     200.1
1499751061062176384 gdax     200.1
1499751061065345111 gdax     200.07
1499751061068908270 gdax     200.03
1499751061072423687 gdax     200.03
1499751061076092806 gdax     200.02
1499751061080056463 gdax     200.01
1499751069380215223 gdax     200.01
1499751069387363027 gdax     200
1499751069389149457 gdax     200
1499751075157157845 gdax     200
1499751078623868849 gdax     200
1499751078631175034 gdax     199.99
1499751078634924445 gdax     199.99
1499751081509782401 gemini   198.59
1499751083045098094 gdax     199.99
1499751083051826211 gdax     199.95
1499751083055082262 gdax     199.9
1499751083061757708 gdax     199.77
1499751083063684547 gdax     199.75
1499751083070894312 gdax     199.71
1499751083074454365 gdax     199.68
1499751083084647041 gdax     200
1499751089277841633 gemini   198.63
1499751091466296362 gdax     199.61
1499751091473331478 gdax     199.52
1499751091477691513 gdax     199.51
1499751091482295921 gdax     199.51
1499751091485554957 gdax     199.51
1499751091491521975 gdax     199.5
1499751091494702854 gdax     199.48
1499751091498348365 gdax     199.46
1499751091504115137 gdax     199.42
1499751091507512925 gdax     199.41
1499751091511016437 gdax     199.41
1499751091517695574 gdax     199.4
1499751091521322287 gdax     199.4
1499751091525476112 gdax     199.39
1499751091528633717 gdax     199.39
1499751091532276535 gdax     199.35
1499751091535837789 gdax     199.31
1499751091539459969 gdax     199.3
1499751091543224187 gdax     199.29
1499751091546746376 gdax     199.27
1499751091555217672 gdax     199.23
1499751091559084092 gdax     199.22
1499751091563140300 gdax     199.21
1499751091568336576 gdax     199.19
1499751095885135035 gdax     199.28
1499751104485151722 gdax     199.27
1499751104491820175 gdax     199.2
1499751104496827279 gdax     199.2
1499751104499272283 gdax     199.19
1499751104504356204 gdax     199.19
1499751107347849942 gdax     199.17
1499751109314911981 gdax     199.44
1499751109321922106 gdax     199.44
1499751116371637772 gemini   198.51
1499751116577557808 gemini   198.51
1499751119427203187 gdax     199.43
1499751122504993200 gdax     199.43
1499751122512840467 gdax     199.43
1499751122518587711 gdax     199.43
1499751122522204897 gdax     199.43
1499751122529811606 gdax     199.43
1499751122532630522 gdax     199.49
1499751122541529614 gdax     199.5
1499751124864238711 gemini   198.5
1499751133307863838 gemini   198.51
1499751133314614963 gemini   198.6
1499751136147480346 gemini   198.6

I want to do a join on timestamp where the result looks like:

gdax_price_point   gemini_price_point
--------           -----------
199.44              198.51
199.5               198.6

When I run the following TICKscript:

var gemini = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gemini')

var gdax = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gdax')

  gemini
      |join(gdax)
          // Provide prefix names for the fields of the data points.
          .as('gemini', 'gdax')
          .tolerance(10s)
          // fill missing values with 0, implies outer join.
          .fill(0.0)
      |influxDBOut()
        .database('arbie_dev')
        .measurement('combined')
        .retentionPolicy('autogen')

I get the result:

name: combined
time                gdax.price_point gemini.price_point
----                ---------------- ------------------
1499751480000000000 200.25           0
1499751500000000000 200.01           0
1499751510000000000 199.99           0
1499751520000000000 200              0
1499751530000000000 199.28           0
1499751540000000000 199.17           0
1499751550000000000 199.44           0
1499751560000000000 199.5            0

Could anyone point me in the right direction as to why gemini.price_point is not populating with the joined value as I would expect?

Many thanks,
Steve

@steve I like that you caught where ETH dipped below 200 in that data stream. Does this script work for you?

var gemini = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gemini')
      |window()
          .period(10s)
          .every(10s)
      |mean('price_point')
          .as("price_point")
          .fill(0.0)


var gdax = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gdax')
      |window()
          .period(10s)
          .every(10s)
      |mean('price_point')
          .as("price_point")
          .fill(0.0)

  gemini
      |join(gdax)
          // Provide prefix names for the fields of the data points.
          .as('gemini', 'gdax')
          .tolerance(10s)
          // fill missing values with 0, implies outer join.
      |influxDBOut()
        .database('arbie_dev')
        .measurement('combined')
        .retentionPolicy('autogen')

Another way to accomplish this would be to write the data in this format. Is that a possibility or desirable?

P.S. There might be typos there with the '',"" on price_point.

@jackzampolin Thanks for the response! Yes you’re right that this is being used to track ethereum prices :slight_smile: Source code is at GitHub - steveklebanoff/arbie: track cryptocurrency prices using elixir and influxdb if you’re interested.

There were some issues with the TICKscript – I couldn’t call .fill on mean() and I had to put price_point in single quotes. I added the fill to the join statement and used "previous" and got the TICKscript to execute.

Here’s the modified script:

var gemini = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gemini')
      |window()
          .period(10s)
          .every(10s)
      |mean('price_point')
          .as('price_point')


var gdax = stream
      |from()
          .database('arbie_dev')
          .measurement('prices')
          .where(lambda: "exchange" == 'gdax')
      |window()
          .period(10s)
          .every(10s)
      |mean('price_point')
          .as('price_point')

gemini
    |join(gdax)
        // Provide prefix names for the fields of the data points.
        .as('gemini', 'gdax')
        .tolerance(10s)
        .fill("previous")
        // fill missing values with 0, implies outer join.
    |influxDBOut()
      .database('arbie_dev')
      .measurement('combined')
      .retentionPolicy('autogen')

I got a response like this:

time                gdax.price_point   gemini.price_point
----                ----------------   ------------------
1499873830000000000 199.26250000000005 198.50666666666666

The thing I’m confused about is that there is only one record, when I’d expect to have a record every 10 seconds. If I change the fill parameter to be 0 I get the following which is closer to what I want, but doesn’t quite solve the problem without filling previous values.

name: combined
time                gdax.price_point   gemini.price_point
----                ----------------   ------------------
1499874320000000000 200.24             0
1499874340000000000 200.09666666666666 0
1499874350000000000 199.89416666666668 198.61
1499874370000000000 199.38279999999997 0
1499874380000000000 199.26250000000005 198.50666666666666

Also, for what it’s worth, I’m able to disable the data visually in a way that is similar to my expected output by using these queries:

# gdax
SELECT mean("price_point") FROM "prices" WHERE "exchange" = 'gdax' AND time > 1499872919013ms and time < 1499873011555ms GROUP BY time(10ms) fill(previous);
# gemini
SELECT mean("price_point") FROM "prices" WHERE "exchange" = 'gemini' AND time > 1499872919013ms and time < 1499873011555ms GROUP BY time(10ms) fill(previous)

Any ideas?

1 Like

@steve That task should run every 10s and produce one line per run. Is it not doing that?

@jackzampolin With the .fill(0) it looks like it’s generating groups every 10 seconds, but with fill("previous") it only generates one group. I think fill("previous") may not be a feature yet based on Join Node should support fill `previous`. · Issue #1289 · influxdata/kapacitor · GitHub , yet this is what I need for the data to be useful.

I was thinking that maybe I have to use an explicit query in a batch in order to get this working, but I am having trouble with that. I’m thinking at this point the best path forward may be to rewrite my code to send to the database in the desired format instead of trying to have kapacitor transform it.

@steve That is always an option. @nathaniel might have some good advice for you too.