One amqp input and two Influxdb outputs

I have two databases in InfluxDB called telegraf and vmware_stats. The metrics for these databases are getting from RabbitMQ. They have different tags, and I can’t filter them by tags or fields, because there are many of them, and they are different. I know that metrics have the header as a database:

    "properties": {
      "headers": {
        "database": "vmware_stats",
        "x-original-exchange": "telegraf",
        "x-original-routing-key": "",
-----
    "properties": {
      "delivery_mode": 1,
      "headers": {
        "database": "telegraf",
        "retention_policy": "default",
        "x-original-exchange": "telegraf",

This header isn’t a tag or field and I can’t use it for filtering by tagpass or fieldpass. Is there any way to route metrics by database header using telegraf?

Configuration of my telegraf:

[inputs.amqp_consumer]
    brokers = ['amqp://localhost:5672/metrics']
    password = 'verysecurepassword'
    queue = 'telegraf'
    username = 'telegraf'

[[outputs.influxdb]]
    content_encoding = 'identity'
    database = 'vmware_stats'
    urls = ['http://localhost:8086']

[[outputs.influxdb]]
    content_encoding = 'identity'
    database = 'telegraf'
    urls = ['http://localhost:8086']

[agent]
    collection_jitter = '0s'
    debug = false
    flush_interval = '1s'
    flush_jitter = '0s'
    interval = '1s'
    metric_batch_size = 2000
    metric_buffer_limit = 50000
    precision = ''
    hostname = ''
    omit_hostname = false

This configuration duplicates all incoming metrics in two databases.

Hello @amk,
I’m not sure. @popey do you have any thoughts here? Thank you!

@amk

I am not sure if I have understood the problem correctly.
What format has the payload of the AMQP messages? json? influx?
Can you post an example payload of the AMQP messages?

Output from AMQP:

[
  {
    "payload_bytes": 288,
    "redelivered": false,
    "exchange": "",
    "routing_key": "telegraf",
    "message_count": 0,
    "properties": {
      "headers": {
        "database": "vmware_stats",
        "x-original-exchange": "telegraf",
        "x-original-routing-key": "",
        "x-received-from": [
          {
            "queue": "telegraf",
            "redelivered": false,
            "uri": "amqp://rabbitmq-global/metrics",
            "visit-count": 1
          }
        ]
      }
    },
    "payload": "vm_fast_stat,Hostname=host1,Region=EU,VC_Cluster=t1,VC_Datacenter=eu,VM_name=vm1,owner=none cpu_count=2i,cpu_usage=314i,disk_size=100.0,disk_usage=16.818078312,mem_gb=4.0,mem_usage=3.9130859375,power_state=\"poweredOn\" 1628384402544219904\n",
    "payload_encoding": "string"
  }
]

Ok thanks, that makes it clearer when we see the whole AMQP payload.


It’s a bit tricky because here json and influx line protocol are mixed.
IMHO, with the builtin parsers json and also json_v2 it will not be possible to achieve it alone.

I think we need a processors plugin with some custom code.
I would solve it with a processors.starlark or processors.execd plugin.
I’ll outline a possible solution here with a processors.starlark plugin.
Not a tested or ready solution - still some homework to do :wink:

[[inputs.amqp_consumer]]
  brokers = ['amqp://localhost:5672/metrics']
  password = 'verysecurepassword'
  queue = 'telegraf'
  username = 'telegraf'
  data_format = "value"  # read the whole json payload as a string
  data_type = "string"  
  name_override = "amqp"  # set measurement name

[[processors.starlark]]
  namepass = ["amqp"]  # therefore only amqp input is processed
  source = '''
def apply(metric):
  # parse the json string
  # get the "database" value
  # get the "payload" value
  # split the payload string into measurement, tags, fields, timestamp
  # make new metric from it
  # assign "database" value as measurement name to the new metric
  # return new metric
'''

[[outputs.influxdb]]
  content_encoding = 'identity'
  database = 'vmware_stats'
  urls = ['http://localhost:8086']
  namepass = ["vmware_stats"]

[[outputs.influxdb]]
  content_encoding = 'identity'
  database = 'telegraf'
  urls = ['http://localhost:8086']
  namepass = ["telegraf"]

The whole thing was much more difficult than expected.
Generally it is not a good idea to embed influx line protocol in json.
I gave up with Starlark, because Starlark is very limited and has no parser for the influx line protocol format.
I wrote an processors.execd plugin in Python instead.
Here the Telegraf conf and the Python script - adjust it to your use case.

[[inputs.file]]
  files = ["amqp.json"]
  data_format = "json_v2"
  [[inputs.file.json_v2]]
    measurement_name = "amqp"
  [[inputs.file.json_v2.tag]]
    path = "#.properties.headers.database"
    rename = "database"
  [[inputs.file.json_v2.field]]
    path = "#.payload"
    rename = "payload"
    type = "string"

[[processors.strings]]  # to get rid of the new line in payload field
  order = 1
  namepass = ["amqp"]  # therefore only amqp input is processed
  [[processors.strings.trim]]  # Trim leading and trailing whitespace
    field = "payload"

[[processors.execd]]  # calls the python script
  order = 2
  namepass = ["amqp"]  # therefore only amqp input is processed
  command = ["python", "amqp.py"]

[[outputs.file]]  # only for debugging
  files = ["amqp-vmware_stats.out"]
  influx_sort_fields = true
  namepass = ["vmware_stats"]

[[outputs.file]]  # only for debugging
  files = ["amqp-telegraf.out"]
  influx_sort_fields = true
  namepass = ["telegraf"]

[[outputs.file]]  # only for debugging
  files = ["amqp.out"]
  influx_sort_fields = true

Python script:

from influxdb_client import Point
from line_protocol_parser import parse_line

while True:
    try:
        input_line = input()  # read from stdin
    except EOFError:  # catch EOF error
        break
    except KeyboardInterrupt:  # catch KeyboardInterrupt
        break
    else:
        input_line = input_line.replace("\\", "")  # replace backslashes
        data = parse_line(input_line)  # parse input line
        payload = data['fields'].get('payload')
        database = data['tags'].get('database')
        if payload:
            metric = parse_line(payload.strip())
            metric['measurement'] = database # change measurement name
            point = Point.from_dict(metric)  # metric from dict
            print(point.to_line_protocol())  # write to stdout