I have two databases in InfluxDB called telegraf and vmware_stats. The metrics for these databases are getting from RabbitMQ. They have different tags, and I can’t filter them by tags or fields, because there are many of them, and they are different. I know that metrics have the header as a database:
This header isn’t a tag or field and I can’t use it for filtering by tagpass or fieldpass. Is there any way to route metrics by database header using telegraf?
I am not sure if I have understood the problem correctly.
What format has the payload of the AMQP messages? json? influx?
Can you post an example payload of the AMQP messages?
Ok thanks, that makes it clearer when we see the whole AMQP payload.
It’s a bit tricky because here json and influx line protocol are mixed.
IMHO, with the builtin parsers json and also json_v2 it will not be possible to achieve it alone.
I think we need a processors plugin with some custom code.
I would solve it with a processors.starlark or processors.execd plugin.
I’ll outline a possible solution here with a processors.starlark plugin.
Not a tested or ready solution - still some homework to do
[[inputs.amqp_consumer]]
brokers = ['amqp://localhost:5672/metrics']
password = 'verysecurepassword'
queue = 'telegraf'
username = 'telegraf'
data_format = "value" # read the whole json payload as a string
data_type = "string"
name_override = "amqp" # set measurement name
[[processors.starlark]]
namepass = ["amqp"] # therefore only amqp input is processed
source = '''
def apply(metric):
# parse the json string
# get the "database" value
# get the "payload" value
# split the payload string into measurement, tags, fields, timestamp
# make new metric from it
# assign "database" value as measurement name to the new metric
# return new metric
'''
[[outputs.influxdb]]
content_encoding = 'identity'
database = 'vmware_stats'
urls = ['http://localhost:8086']
namepass = ["vmware_stats"]
[[outputs.influxdb]]
content_encoding = 'identity'
database = 'telegraf'
urls = ['http://localhost:8086']
namepass = ["telegraf"]
The whole thing was much more difficult than expected.
Generally it is not a good idea to embed influx line protocol in json.
I gave up with Starlark, because Starlark is very limited and has no parser for the influx line protocol format.
I wrote an processors.execd plugin in Python instead.
Here the Telegraf conf and the Python script - adjust it to your use case.
[[inputs.file]]
files = ["amqp.json"]
data_format = "json_v2"
[[inputs.file.json_v2]]
measurement_name = "amqp"
[[inputs.file.json_v2.tag]]
path = "#.properties.headers.database"
rename = "database"
[[inputs.file.json_v2.field]]
path = "#.payload"
rename = "payload"
type = "string"
[[processors.strings]] # to get rid of the new line in payload field
order = 1
namepass = ["amqp"] # therefore only amqp input is processed
[[processors.strings.trim]] # Trim leading and trailing whitespace
field = "payload"
[[processors.execd]] # calls the python script
order = 2
namepass = ["amqp"] # therefore only amqp input is processed
command = ["python", "amqp.py"]
[[outputs.file]] # only for debugging
files = ["amqp-vmware_stats.out"]
influx_sort_fields = true
namepass = ["vmware_stats"]
[[outputs.file]] # only for debugging
files = ["amqp-telegraf.out"]
influx_sort_fields = true
namepass = ["telegraf"]
[[outputs.file]] # only for debugging
files = ["amqp.out"]
influx_sort_fields = true
Python script:
from influxdb_client import Point
from line_protocol_parser import parse_line
while True:
try:
input_line = input() # read from stdin
except EOFError: # catch EOF error
break
except KeyboardInterrupt: # catch KeyboardInterrupt
break
else:
input_line = input_line.replace("\\", "") # replace backslashes
data = parse_line(input_line) # parse input line
payload = data['fields'].get('payload')
database = data['tags'].get('database')
if payload:
metric = parse_line(payload.strip())
metric['measurement'] = database # change measurement name
point = Point.from_dict(metric) # metric from dict
print(point.to_line_protocol()) # write to stdout