Flux Doing Transforms on Multiple Fields

Hello

I am having some difficulty with the flux language(completely new to it), I am pulling some data from ethermine.org and want to display it in grafana. I have the following measurements and fields that are applicable: measurement name is my_metric-ethermine and the 2 field names are “unpaid”, “price_usd”. The way the API from ethermine returns my unpaid ether balance is in base units, which I need to divide by 10e17 in order to get the value in ether, I then want to multiply this with the current ether price in USD and display it next to my ether balance.
I want it displayed similarly to this, minus the left gauge


Here is the code I have for the left panel

  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "my_metric-ethermine" and
    r._field == "unpaid" 
  )

I have also tried something like this: but I get an error that unpaid is unidentified so I assume my syntax is wrong

from(bucket: "altus")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "my_metric-ethermine" and
    r._field == "unpaid" or r._field == "price_usd"
    
  |> map(fn: (r) => ({
    r with
    _field: unpaid / 10e17 * price_usd
  })
  )
  )

If I click the replace all fields button it removes the left gauge, but in the second transform I don’t see an option to take Unpaid Eth Balance and multiply it with price_usd.

I think _field might be the issue, should probably be _value. Docs provide this in one of the examples

map(fn: (r) => ({ _value: r._value * r._value }))

https://docs.influxdata.com/influxdb/cloud/reference/flux/stdlib/built-in/transformations/map/

I tried this: but get that error

Just found this old post- it’s not exactly what you’ve described but it’s close.

Thanks, i’ll have a look at it when I get data to go into influxdb again…added some stuff to my python script and suddenly data just stopped being sent :grimacing: :sweat_smile:

@Altus Try this:

from(bucket: "altus")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "my_metric-ethermine" and
    r._field == "unpaid" or r._field == "price_usd"
  )
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({
    r with
    _value: (r.unpaid / 10e17) * r.price_usd
  }))

will do once I get my data back into influxdb…it broke and I’m not able to fix it :grimacing:
Not sure what happened

Sorry that I’m only getting back now…managed to semi fix my data, have at least identified the piece of code that causes everything to break. I copied your code and got an error: invalid: error @10:27-10:30: undefined identifier e17
Edit: it works if I type the raw number ie 1000000000000000000.0

Oh right :man_facepalming:, Flux doesn’t support scientific notation float values.

haha no worries, thx for the help :slight_smile:

Hey scott, sorry to bother again, but it seems to have stopped working ( the same number has been displayed for the past day without changing). It was working until yesterday(will attach some screenshots to help explain).
1st screenshot has all 3 fields in and seems that only 1 value is non-zero in the past 24 hours


2nd screenshot is rates_ZAR (USD to ZAR exchange rate price)

3rd screenshot is price_usd

4th screenshot is unpaid balance of ethereum

5th screenshot is just unpaid and price USD (should display the price of my unpaid balance in USD…which it does, not sure why it bounces up and down so frequently though)

So it seems to break when I add in rates_ZAR (ie screenshot 1 is the broken one) field (was working 1 day earlier though), but when displaying that field on it’s on it seems to be collecting data just fine?

@Altus Can you provide some of the raw data from your pivot() (before map())?

Sorry I could only reply now.



Looking at the timestamps what I gather is happening is that the two API requests are not happening at the same time and therefore the calculation is returning a null value and only once a day they align and then it gives me a value(not sure if this is how it works)? Not sure why it doesn’t store the previous value of rates_ZAR and use that? In my telegraf config I am executing two python scripts, one every two minutes and one every hour. The one that executes every hour contains the rates_ZAR field. Another weird thing that happened is the exchange API I am using only allows free 1000 requests a month and it seems yesterday it used 700 of those…not sure why it spiked like that.

Here is my telegraf config for reference:

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
  ## output, and will flush this buffer on a successful write. Oldest metrics
  ## are dropped first when this buffer fills.
  ## This buffer only fills when writes fail to output plugin(s).
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s.
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  precision = ""

  ## Logging configuration:
  ## Run telegraf with debug log messages.
  debug = true
  ## Run telegraf in quiet mode (error log messages only).
  quiet = false
  ## Specify the log file name. The empty string means to log to stderr.
  logfile = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false
[[outputs.influxdb_v2]]	
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ## urls exp: http://127.0.0.1:8086
  urls = ["http://xxxx:8086"]

  ## Token for authentication.
  token = "xxxxx"

  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "altus"

  ## Destination bucket to write into.
  bucket = "altus"
[[inputs.cpu]]
  ## Whether to report per-cpu stats or not
  percpu = true
  ## Whether to report total system cpu stats or not
  totalcpu = true
  ## If true, collect raw CPU time metrics.
  collect_cpu_time = false
  ## If true, compute and report the sum of all non-idle CPU states.
  report_active = false
[[inputs.disk]]
  ## By default stats will be gathered for all mount points.
  ## Set mount_points will restrict the stats to only the specified mount points.
  # mount_points = ["/"]
  ## Ignore mount points by filesystem type.
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.mem]]
[[inputs.net]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]

[[inputs.exec]]
commands = ["python3 /usr/local/bin/basicTest.py"]
data_format = "json"
interval = "120s"
name_suffix = "-ethermine"
name_override = "my_metric"

[[inputs.exec]]
commands = ["python3 /usr/local/bin/openEx.py"]
data_format = "json"
interval = "3600s"
name_suffix = "-ethermine"
name_override = "my_metric"

Not sure if something is setup wrong in my config maybe? Also not sure if I can run two inputs.exec on two seperate timings like I am doing here, one every 120s and one every 3600s(hour)?

Ok, you should just be able to fill those null values with the previous values. Try this:

from(bucket: "altus")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "my_metric-ethermine" and
    r._field == "unpaid" or r._field == "price_usd" or r._field == "rates_ZAR"
  )
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> fill(column: "rates_ZAR", usePrevious: true)
  |> fill(column: "price_usd", usePrevious: true)
  |> fill(column: "unpaid", usePrevious: true)
  |> map(fn: (r) => ({
    r with
    _value: (r.unpaid / 1000000000000000000.0) * r.price_usd * r.rates_ZAR
  }))

Getting this error:

taking out that fill rates_zar line it seems to give a value every hour, which should be more than adequate.

Cool, you could filter out all rows with null values if you add this after map():

// ...
  |> filter(fn: (r) => exists r._value)

sweet, thanks for the help

No problem. Happy to help!