I am forwarding a twitter stream to kapacitor with the goal of calculating the top(10) tweeting cities. The data I am sending resembles the following:
{
"measurement": "tweets",
"time": 1496082674991192064,
"tags": {
"city": "Los Angeles"
},
"fields": {
"hits": 1
}
}
For each tweet from the stream, I send the name of the city and a value of 1 for “hits”. I expect to read from the HTTP endpoint a list of the top tweeting cities for the current window. My tick script looks like the following:
stream
|from()
.database('twitter')
.retentionPolicy('default')
.measurement('tweets')
|groupBy('city')
|window()
.period(1m)
.every(10s)
|count('hits')
.as('hits')
|top(10, 'hits')
.as('hits')
|httpOut('top10')
What I actually see at the HTTP endpoint is a constantly growing list of cities without any kind of strict ordering. The HTTP endpoint data below has been summarized for easier reading:
$ curl -s http://localhost:9092/kapacitor/v1/tasks/twitter/top10 | jq -c '.series[] | {"city": .tags.city, "values": .values[][1]}'
{"city":"Georgia","values":13}
{"city":"Manhattan","values":11}
{"city":"Toronto","values":6}
{"city":"Tennessee","values":3}
{"city":"Seattle","values":4}
{"city":"Texas","values":8}
{"city":"Bowling Green","values":1}
{"city":"Florida","values":11}
{"city":"Brooklyn","values":3}
{"city":"Columbus","values":3}
{"city":"California","values":5}
{"city":"Virginia","values":6}
{"city":"Los Angeles","values":9}
{"city":"Ottawa","values":3}
{"city":"Orlando","values":1}
{"city":"Pennsylvania","values":6}
{"city":"Memphis","values":4}
{"city":"New York","values":2}
{"city":"Fort Worth","values":5}
{"city":"Las Vegas","values":2}
{"city":"Missouri","values":2}
{"city":"Chicago","values":8}
{"city":"Washington","values":3}
{"city":"Houston","values":9}
{"city":"San Francisco","values":7}
{"city":"Oakland","values":1}
What can I do to get closer to my goal?
Notes: I did try resetting groupBy membership with groupBy(). I also tried multiple things from the leaderboard example without much success.