[SOLVED] Having trouble with top(10) leaderboard

I am forwarding a twitter stream to kapacitor with the goal of calculating the top(10) tweeting cities. The data I am sending resembles the following:

{
  "measurement": "tweets",
  "time": 1496082674991192064,
  "tags": {
    "city": "Los Angeles"
  },    
  "fields": {
    "hits": 1
  }     
} 

For each tweet from the stream, I send the name of the city and a value of 1 for “hits”. I expect to read from the HTTP endpoint a list of the top tweeting cities for the current window. My tick script looks like the following:

stream
        |from()
                .database('twitter')
                .retentionPolicy('default')
                .measurement('tweets')
        |groupBy('city')
        |window()
                        .period(1m)
                        .every(10s)
        |count('hits')
                .as('hits')
        |top(10, 'hits')
                .as('hits')
        |httpOut('top10')

What I actually see at the HTTP endpoint is a constantly growing list of cities without any kind of strict ordering. The HTTP endpoint data below has been summarized for easier reading:
$ curl -s http://localhost:9092/kapacitor/v1/tasks/twitter/top10 | jq -c '.series[] | {"city": .tags.city, "values": .values[][1]}'

{"city":"Georgia","values":13}
{"city":"Manhattan","values":11}
{"city":"Toronto","values":6}
{"city":"Tennessee","values":3}
{"city":"Seattle","values":4}
{"city":"Texas","values":8}
{"city":"Bowling Green","values":1}
{"city":"Florida","values":11}
{"city":"Brooklyn","values":3}
{"city":"Columbus","values":3}
{"city":"California","values":5}
{"city":"Virginia","values":6}
{"city":"Los Angeles","values":9}
{"city":"Ottawa","values":3}
{"city":"Orlando","values":1}
{"city":"Pennsylvania","values":6}
{"city":"Memphis","values":4}
{"city":"New York","values":2}
{"city":"Fort Worth","values":5}
{"city":"Las Vegas","values":2}
{"city":"Missouri","values":2}
{"city":"Chicago","values":8}
{"city":"Washington","values":3}
{"city":"Houston","values":9}
{"city":"San Francisco","values":7}
{"city":"Oakland","values":1}

What can I do to get closer to my goal?

Notes: I did try resetting groupBy membership with groupBy(). I also tried multiple things from the leaderboard example without much success.

1 Like

Just my luck! I moved the groupBy(‘city’) AFTER the window then reset the groupBy() and it seems to be working perfectly:

stream
        |from()
                .database('twitter')
                .retentionPolicy('default')
                .measurement('tweets')
        |window()
                        .period(1m)
                        .every(1s)
        |groupBy('city')
        |count('hits')
                .as('hits')
        |groupBy()
        |top(10, 'hits')
                .as('hits')
        |httpOut('top10')

$ curl -s http://localhost:9092/kapacitor/v1/tasks/twitter/top10 | jq -c '.series[].values[] | {"hits": .[2], "city": .[1]}'

{"hits":38,"city":"Los Angeles"}
{"hits":21,"city":"Houston"}
{"hits":18,"city":"Manhattan"}
{"hits":17,"city":"Georgia"}
{"hits":17,"city":"Brooklyn"}
{"hits":16,"city":"California"}
{"hits":14,"city":"Toronto"}
{"hits":14,"city":"Florida"}
{"hits":14,"city":"Texas"}
{"hits":13,"city":"Virginia"}
1 Like

@mspiegle Glad you were able to get it working! Do you have a repo with the code for the twitter data? This sounds like a very cool project!

I do, but it’s currently private. I’m putting a short presentation together for a local meetup group and wanted to show off something interesting with kapacitor. The repo is private because I have API keys embedded into my code since this isn’t meant to be used in production and probably won’t evolve much beyond this. In the interests of sharing, this is my Python script which polls the Twitter stream API and forwards the city to Kapacitor. It could be easily modified to send more data:

from __future__ import print_function
from influxdb import InfluxDBClient
from twitter import Api
import json
import time

api = Api(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET)

south_west = "-125,25"
north_east = "-60,50"
box = [south_west, north_east]

client = InfluxDBClient(
    host="kapacitor",
    port=9092,
    database="twitter"
)

def tweet_to_point(t):
    place = t.get("place", {}) or {}
    place = place.get("name", "None") or "None"

    return [{
        "measurement": "tweets",
        "time": int(time.time() * (10**9)),
        "tags": {
            "city": place.encode("ascii", "ignore")
        },
        "fields": {
            "hits": 1
        }
    }]

def main():
    count = 0
    for tweet in api.GetStreamFilter(locations=box):
        t = tweet_to_point(tweet)
        client.write_points(
            points=t,
            retention_policy="default"
        )
        if count % 100 == 0:
            print(count)
            print(json.dumps(t))
        count += 1

if __name__ == "__main__":
    main()
1 Like