Parsing http with json

Hello all,
I’m trying to retrieve and store values from the urls below (Coinbase currency sell values)

if I do a curl for time
curl https://api.coinbase.com/v2/time
Returns {“data”:{“iso”:“2021-06-12T19:07:36Z”,“epoch”:1623524856}}

curl https://api.coinbase.com/v2/prices/MATIC-USD/sell
Returns {“data”:{“base”:“MATIC”,“currency”:“USD”,“amount”:“1.34”}}

I am trying to parse these values into an influxdb bucket using telegraf so I’ve got the following input in the conf file. I’m capturing the amount as a string because that’s how it is output in the json and I know from reading the documentation that it would be ignored unless explicitly added as a string field (unless I’m mistaken)

[[inputs.http]]
#  interval = "60s"
#  timeout = "15s"
  name_override = "currencies"
  urls = ["https://api.coinbase.com/v2/time",
          "https://api.coinbase.com/v2/prices/MATIC-USD/sell",
          "https://api.coinbase.com/v2/prices/ETH-USD/sell",
          "https://api.coinbase.com/v2/prices/ETC-USD/sell",
         ]
  method = "GET"
  data_format = "json"
  json_name_key = "data"
  tag_keys = ["currency"]
  json_string_fields = ["amount"]
  json_time_key = "epoch"
  json_time_format = "unix"

The idea is to collect the value of the currencies and use the time obtained from the first url to set the _time in the bucket. However, no data is being written to the bucket.

I enabled debug=true for telegraf and I get the following:

> 2021-06-12T19:19:30Z E! [inputs.http] Error in plugin: [url=https://api.coinbase.com/v2/prices/ETH-USD/sell]: JSON time key could not be found
> 2021-06-12T19:19:30Z E! [inputs.http] Error in plugin: [url=https://api.coinbase.com/v2/prices/MATIC-USD/sell]: JSON time key could not be found
> 2021-06-12T19:19:30Z E! [inputs.http] Error in plugin: [url=https://api.coinbase.com/v2/prices/ETC-USD/sell]: JSON time key could not be found
> 2021-06-12T19:19:30Z E! [inputs.http] Error in plugin: [url=https://api.coinbase.com/v2/time]: JSON time key could not be found
> 2021-06-12T19:19:38Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

I would expect the sell urls to error out about the time key, but the time one should be fine.

Please help

epoch is not a key at the base of the JSON, there is data first at base level…

Thanks for your help @Hipska
I changed json_time_key to “data.epoch” and “data” to no avail.
I have also commented out the json_time_key and json_time_format fields altogether but still no data is being extracted and stored. How do I debug the http input plugin? I’ve enabled debug=true in the telegraf.conf file.

Just an update. Re-reading the documentation and looking at the example

json_time_key needs to be “data_epoch”

The time error disappeared which is a good sign but still no data being written.

Hi all,
So I’ve semi-fixed the problem by restarting the system. No idea what happened there.
Here’s the thing though, it’s only logging “data_epoch” as a field so not processing the other URLs in the list.

Am I using the plugin correctly? Is it supposed to be able to process multiple URLs in the same plugin because it looks like it’s picking the first URL and processing it but not the rest.

Influxdb is very particular about data types. Note the quotes around the value for amount 1.34. Try adding the post processor telegraf/plugins/processors/converter/README.md at release-1.18 · influxdata/telegraf · GitHub

Edit; adding more detail

[[processors.converter]]
  [processors.converter.tags]
    float = ["amount"]

Thank you very much.

Should this work?
{“data”:{“base”:“MATIC”,“currency”:“USD”,“amount”:“1.34”}}
Just to explain:

  • “data” is the json_name_key
  • tag_keys is “base” so that should create a tag called base with a value of “MATIC” for example.
  • I’m not sure I need “currency” but that is stored as a string.
  • data_epoch should hopefully save the value of " $data.epoch" as the unix time
[[inputs.http]]
#  interval = "60s"
#  timeout = "15s"
  name_override = "currencies"
  urls = ["https://api.coinbase.com/v2/time",
          "https://api.coinbase.com/v2/prices/MATIC-USD/sell",
          "https://api.coinbase.com/v2/prices/ETH-USD/sell",
          "https://api.coinbase.com/v2/prices/ETC-USD/sell",
         ]
  method = "GET"
  data_format = "json"
  json_name_key = "data"
  tag_keys = ["base"]
  json_string_fields = ["currency"]
  json_time_key = "data_epoch"
  json_time_format = "unix_ms"

[[processors.converter]]
  [processors.converter.tags]
    float = ["amount"]

You maybe need the converter

[processors.converter.fields]

instead, because you want the amount value as a field value and not a tag value.

Thank you all.

I tried and it’s still not writing anything. How do I know what’s tripping it? Is there a way to debug the plugins?

This works and should give you a starting point:

[[inputs.http]]
  name_override = "currencies"
  urls = [
          "https://api.coinbase.com/v2/prices/MATIC-USD/sell",
          "https://api.coinbase.com/v2/prices/ETH-USD/sell",
          "https://api.coinbase.com/v2/prices/ETC-USD/sell",
          ]
  method = "GET"
  data_format = "json"
  json_query = "data"
  tag_keys = ["base", "currency"]
  json_string_fields = ["amount"]
  # json_time_key = "epoch"
  # json_time_format = "unix_ms"
  tagexclude = ["url"]

# Convert values to another metric value type
[[processors.converter]]
  [processors.converter.fields]
    float = ["amount"]

# file output only for debugging
[[outputs.file]]
  files = ["coinbase.out"]
  influx_sort_fields = true

@Franky1 This works perfectly. Thank you very much indeed. Is it the time URL that’s confusing things then?
Is there a way to perhaps run a separate http input and use the timestamp from that input?

The problem is, that the api endpoints /time and /sell respond with different json payloads. The json parser cannot handle this in that way you want it.

Yes you could use a second http plugin just to query the /time endpoint.
But then you have to merge these two data streams from /time and /sell somehow with an additional plugin.

This will work to collect the endpoints independently:

[[inputs.http]]
  name_override = "time"
  urls = [
          "https://api.coinbase.com/v2/time",
          ]
  method = "GET"
  data_format = "json"
  json_query = "data"
  json_string_fields = ["iso"]
  json_time_key = "epoch"
  json_time_format = "unix"
  tagexclude = ["url"]

[[inputs.http]]
  name_override = "sell"
  urls = [
          "https://api.coinbase.com/v2/prices/MATIC-USD/sell",
          "https://api.coinbase.com/v2/prices/ETH-USD/sell",
          "https://api.coinbase.com/v2/prices/ETC-USD/sell",
          ]
  method = "GET"
  data_format = "json"
  json_query = "data"
  tag_keys = ["base", "currency"]
  json_string_fields = ["amount"]
  tagexclude = ["url", "currency"]

# Convert values to another metric value type
[[processors.converter]]
  [processors.converter.fields]
    float = ["amount"]

# file output only for debugging
[[outputs.file]]
  files = ["coinbase2.out"]
  influx_sort_fields = true

BUT then it is getting hairy… i don’t know of any plugin that can force the timestamp from one measurement into another measurement… :thinking:

1 Like

Thanks. I implemented a similar thing but I was wondering if there was some sort of processor that collected the outcome from both input plugins and combined them then output them into the file/influxdb bucket

Hello @t481,
Have you tried using the execd processor plugin?

@t481
As suggested by @Anaisdg, you could use an processors.execd plugin.
Here is a working solution with Python, the Telegraf config and the Python script:

[[inputs.http]]
  name_override = "time"
  urls = [
          "https://api.coinbase.com/v2/time",
          ]
  method = "GET"
  data_format = "json"
  json_query = "data"
  json_string_fields = ["iso"]
  json_time_key = "epoch"
  json_time_format = "unix"
  tagexclude = ["url"]

[[inputs.http]]
  name_override = "sell"
  urls = [
          "https://api.coinbase.com/v2/prices/MATIC-USD/sell",
          "https://api.coinbase.com/v2/prices/ETH-USD/sell",
          "https://api.coinbase.com/v2/prices/ETC-USD/sell",
          ]
  method = "GET"
  data_format = "json"
  json_query = "data"
  tag_keys = ["base", "currency"]
  json_string_fields = ["amount"]
  tagexclude = ["url", "currency"]

[[processors.execd]]
  command = ["python", "coinbase3.py"]

# file output only for debugging
[[outputs.file]]
  files = ["coinbase3.out"]
  influx_sort_fields = true

from influxdb_client import Point
from line_protocol_parser import parse_line

dataset = dict()

while True:
    try:
        input_line = input()  # read from stdin
    except EOFError:  # catch EOF error
        break
    except KeyboardInterrupt:  # catch KeyboardInterrupt
        break
    else:
        data = parse_line(input_line)  # parse input line
        amount = data['fields'].get('amount')
        if amount:
            data['fields'].update({'amount': float(amount)})  # update field type

        # collect all metrics
        measurement = data['measurement']
        if measurement == "sell":
            base = data['tags'].get('base')
            dataset[base] = data
        elif measurement == "time":
            dataset["time"] = data

        # if all metrics collected, replace timestamp and spit them out
        if len(dataset) == 4:
            newtime = dataset["time"].get("time")
            for key, metric in dataset.items():
                if key != "time":
                    metric['time'] = newtime  # assign new time
                    point = Point.from_dict(metric)  # metric from dict
                    print(point.to_line_protocol())  # write to stdout
            dataset.clear()  # clear set of metrics for next collection
1 Like

@t481 Want to make sure you’re aware of the new JSON parser (json_v2). It enables changing data types from your JSON in the parser itself.

1 Like