Nested queries in Kapacitor

I am trying to run the following nested query in Kapacitor, but get an error message :
failed to parse InfluxQL query: found (, expected identifier at line 2, char 39

var email_base = batch
    |query('''
        SELECT count(*) as "cnt" FROM (
            select payment_id, email_domain from "my_db"."autogen"."payment_attempts" GROUP BY payment_id
            )         
    ''')
		.period(14d)
        .every(10s)
        .groupBy(time(1d), 'email_domain')
        .offset(75d)
        .fill(0)
    |mean('cnt_amount')
        .as('amount_mean')

email_base
    |log()

My payment_attempt measurement may have multiple points with the same payment_id tag. I need the number of unique email domains supplied in the same payment_id.

I try to adapt this InfluxDB query to kapacitor:
SELECT count(*) FROM (select * from payment_attempts GROUP BY payment_id) where time >= end_time and time <= start_time GROUP BY email_domain, TIME(1d);

How can I rewrite my TICK script to support this functionality?

Thank you very much!
Tsachi.

I created the following TICK script, but I am not sure of it’s correctness. I tried to get an item from the log, but the numbers are not 100% matching my InfluxDB query

My new TICK script:

var email_base = batch
    |query('''
        SELECT payment_id, amount, email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(14d)
        .every(10s)
        .groupBy(time(1d), 'payment_id')
        .groupBy('email_domain')
        .offset(75d)
    |count('email_domain')
        .as('email_count')
    |mean('email_count')
        .as('v_mean')

This part doesn’t look correct, the second call will override the first.

      .groupBy(time(1d), 'payment_id')
      .groupBy('email_domain')

You probably meant something like this:

var email_base = batch
    |query('''
        SELECT payment_id, amount, email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(14d)
        .every(10s)
        .groupBy(time(1d), 'payment_id')
        .offset(75d)
    |groupBy('email_domain')
    |count('email_domain')
        .as('email_count')
    |mean('email_count')
        .as('v_mean')

And as you have discovered Kapacitor doesn’t support sub queries yet.

Thank you @nathaniel! I ran the TICK script you suggested, but got the following error:
error parsing query: GROUP BY requires at least one aggregate function

@nathaniel I don’t know how to fix the groupBy issue, can you take a look please?
Problem seems to be in this operation:

var email_base = batch
    |query('''
        SELECT payment_id, amount, email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(14d)
        .every(10s)
        .groupBy(time(1d), 'payment_id')
        .offset(75d)

Thank you!

@tsachi The query is asking for 14 days of data grouped into 1d time buckets but you haven’t specified how to group the data in to 1d time buckets. Do you want to take the mean, min, first, last, etc?

Basically, I need to rewrite the InfluxDB query I wrote in the beginning of the post.
My need is to count unique email domains with the same payment_id.
And this part of the query reflects my wish to remove duplicates of the same payment_id, email_domain.

If there is a cleaner way to rewrite my InfluxDB script, it should also be OK.

Thank you @nathaniel!
Tsachi.

If you want the number of unique email domains with the same payment_id is that not simply a distinct query?

SELECT COUNT(DISTINCT("email_domain")) FROM "my_db"."autogen"."payment_attempts" WHERE time > now() - 14 GROUP BY time(1d), payment_id

If so that would simplify the TICKscript as all you need is that one query without the sub query.

I am not quite sure how to restructure the original query as I am not quite sure what its doing.
Could you share some example data so I can better understand?

Thank you nathaniel.

How can I share the data with you?

Best regards,
Tsachi

@tsachi Posting it in a Github Gist is an easy way to share.

Hi @jackzampolin. Thank you for your help.
Here is a dummy gist reflecting the scenario of:
2 measurements for payment_id = 100, with different email domains
2 measurements for payment_id = 200, with the same email domain

I want to write a query that outputs 3 results for this data, one for payment_id = 200 and 2 for payment_id = 100

Does using distinct solve this need?

If I understand correctly you want this data back:

payment_id=100 count=2
payment_id=200 count=1

This TICKscript should do that.

// Select raw data for each payment attempt grouped by payment_id
var email_base = batch
    |query('''
        SELECT value,email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(1d)
        .every(10s)
        .groupBy('payment_id')
    // Get just he distinct email domains per payment_id
    |distinct('email_domain')
    // Count the distinct email_domains per payment_id
    |count('distinct')
        .as('email_count')

Hi Nathaniel,
I work with Tsachi. thank you for this!
This seems the right direction. However our use case requires that the day grouping performed later. We are trying to:

for each email_domain, count the number of distinct payments in which it appeared in each of the past 14 days.

i.e. we want to achieve the results that this query returns:

select count(email_domain) from (select first(payment_id),first(email_domain),count(total_amount) from payment_attempts where time >= now() - 14d group by payment_id,email_domain) where time >= now() - 14d group by email_domain,time(1d)

And I did not find how to do day grouping as in independent node, not as a query node property.
How can this be done?

Thanks,
Noam

Check out the window node.

Then change the period property of the query node to be 14 days.

Hi Nathaniel, window node seems to run only on a stream, while we do batch/query. Any bypass around that?

Window node can work in both batch and stream tasks but it most consume a batch edge. (Sorry for the confusion of the overloaded terms).

Can you share the TICKscript that gave you an error when using the window node?

Thank you Nathaniel.
For now I am using the following script to give the average of email_domains.

var email_benchmark = batch
    |query('''
        SELECT value, email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(14d)
        .every(5m)
        .groupBy('payment_id','email_domain')
    |groupBy('email_domain')
    |count('email_domain')
        .as('num_of_payments')
    |eval(lambda: float("num_of_payments")/14.0)
        .as('daily_average')

I think it works.

Are there any debugging tools for Kapacitor? They’ll help me run a sanity check on the output.

Hi again @nathaniel and @jackzampolin.
I am stuck with finalizing my alert and I hope you can provide the missing block.

I have the following tick script that triggers an alarm if the last day had more than N unique email_domains compared to the last 14 days.

I would like to update the email_benchmark to include 14 days ending yesterday, instead of 14 days ending today.

Simply adding .offset(1d) does not work. My hunch is that the join fails due to the mismatch on times. Do you have any suggestion on how to proceed with that?

Thanks a lot!

Tsachi.

var email_benchmark = batch
    |query('''
        SELECT total_amount,email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(14d)
        .every(5m)
        .groupBy('payment_id','email_domain')
        .fill(1)
    |groupBy('email_domain')
    |count('email_domain')
        .as('num_of_payments')
    |eval(lambda: float("num_of_payments")/14.0)
        .as('daily_average')

var email_curr = batch
    |query('''
        SELECT total_amount,email_domain FROM "my_db"."autogen"."payment_attempts"
    ''')
        .period(1d)
        .every(5m)
        .groupBy('payment_id','email_domain')
        .fill(1)
    |groupBy('email_domain')
    |count('email_domain')
        .as('num_of_payments')
    |eval(lambda: float("num_of_payments"))
        .as('curr_num_of_payments')

email_benchmark
    |join(email_curr)
        .tolerance(1m)
        .as('benchmark', 'curr')
    |eval(lambda: float("curr.curr_num_of_payments") / float("benchmark.daily_average"))
        .as('breach_ratio')
    |alert()
        .crit(lambda: "breach_ratio" > N)
        .slack()
        .channel('#alerts')