[Solved]Kapacitor: Cryptocurrency price transformations

I have some basic cryptocurrency pricing data aggregated via Influx CQs, as follows:

table: vwap_1m

  • base (e.g. BTC)
  • quote (e.g. USD)
  • priceVWAP (e.g. 8000)
  • 24h_volume

My end goal is to get USD prices for all base’s, as such:

  • If there’s a native USD trading pair (“quote” == ‘USD’) among a base group, rely on that
  • When the USD pair is missing, transform its most liquid pair (top 1 in terms of 24h_volume) into USD, for example ABC/USD = ABC/ETH x 1/ETH/USD

I’m struggling to find a way to do this with Kapacitor. I’ve tried going through combine, join etc. and I’d love a pointer or two where to dig further.

Thanks in advance!

Found a solution which might look a bit messy, but it works. Here’s the TICK script.

It might still need some optimizations, so in case you have any ideas, please help out.

Thanks!

var liquid = batch
    |query('SELECT * FROM "redacted"."a_month"."vwap_1m"')
        .align()
        .period(2m)
        .every(1m)
    |httpOut('liquid0')
    |eval(lambda: "base", lambda: "priceVWAP", lambda: string("quote") + 'USD', lambda: "24h_volume", lambda: if("quote" == 'USD', 100000000000000000.0, float("24h_volume")))
        .as('base', 'priceVWAP', 'bq', '24h_volume_orig', '24h_volume')
        .tags('base', 'bq')
        .keep('quote', 'priceVWAP', '24h_volume_orig', '24h_volume', 'bq')
    |groupBy('base')

var liquidSum = liquid
    |sum('24h_volume_orig')
        .as('sum_24h_volume')

liquid
    |max('24h_volume')
        .as('24h_volume')
    |httpOut('liquid')
    |influxDBOut()
        .database('redacted')
        .retentionPolicy('a_month')
        .measurement('vwap_1m_ext')

var usd = batch
    |query('SELECT * FROM "redacted"."a_month"."vwap_1m"')
        .align()
        .period(2m)
        .every(1m)
    |where(lambda: "quote" == 'USD')
    |httpOut('usd')
    |eval(lambda: string("base") + string("quote"))
        .as('bq')
        .tags('bq')
        .keep('priceVWAP')
    |groupBy('bq')
    |last('priceVWAP')
        .as('priceC')

var liquid_ext = batch
    |query('SELECT * FROM "redacted"."a_month"."vwap_1m_ext"')
        .align()
        .period(2m)
        .every(1m)
        .groupBy('bq', 'base')
    |max('priceVWAP')
        .as('priceC')
    |join(usd)
        .as('liquid_ext', 'usd')
        .on('bq')
        .tolerance(1m)
    |eval(lambda: "base", lambda: "liquid_ext.24h_volume", lambda: "liquid_ext.priceC", lambda: "liquid_ext.quote", lambda: if("quote" == 'USD', "liquid_ext.priceC", "liquid_ext.priceC" * "usd.priceC"))
        .as('base', '24h_volume', 'price_init', 'quote', 'price')
    |influxDBOut()
        .database('redacted')
        .retentionPolicy('a_month')
        .measurement('vwap_1m_ext_converted')

var liquid_ext_converted = batch
    |query('SELECT "base", "price", "24h_volume" FROM "redacted"."a_month"."vwap_1m_ext_converted"')
        .align()
        .period(2m)
        .every(1m)
        .groupBy('base')
    |max('24h_volume')
        .as('24h_volume')
    |join(liquidSum)
        .as('l1', 'l2')
        .tolerance(1m)
    |eval(lambda: "l1.base", lambda: "l1.price", lambda: "l2.sum_24h_volume", lambda: "l2.sum_24h_volume" * "l1.price")
        .as('base', 'price', '24h_volume', '24h_volume_usd')
    |httpOut('liquid_final')
    |influxDBOut()
        .database('redacted')
        .retentionPolicy('a_month')
        .measurement('vwap_1m_usd')
1 Like