Sql.from() lookup for enriching datasets

Hi there,

i’m trying to use sql.from() for solving a realworld problem: Lookup the network name by autonomous system number from a mysql table to have it in the flux resultset for further use (visualisation). Data in influx comes from ntopng, asn info is in a mysql Database.

Could look like:

import “sql”

asnInfo = sql.from(
driverName: “mysql”,
dataSourceName: “fluxuser:xxx@tcp(x.x.x.x:3306)/ip2location”,
query:“SELECT asn,as FROM ip2location_asn”

dpiMetric = from(bucket: “ntopng”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® => r._measurement == “asn:ndpi”)
|> filter(fn: ® => r._field == “bytes_rcvd” or r._field == “bytes_sent”)
|> derivative(unit: v.windowPeriod, nonNegative: true)

join(tables: {metric: dpiMetric, info: asnInfo}, on: [“asn”])

Won’t work. Query is stuck thanks to the roughly 1M records in the ip2location_asn table in the mysql db. For testing purposes i limited the mysql query to a few ASN’s i know that are present in the flux result (… where asn in (x,y,z) …) it works.

Is it possible to integrate a dynamic sql querybuilding for having a proper where clause into the streamprocessing, so only required records are fetched from mysql? Are the results then cached in some key-value store while processing that stream to avoid unnescesary sql queries? I haven’t found anything in the docs…


For the moment, your best option is to use string interpolation to build a query with a where clause. Something like:

someVar = "foo"
query = "SELECT asn,as FROM ip2location_asn WHERE asn == \"#{someVar}\""

Hi Paul,

thanks for answering so quick. Can you give a quick example on how to build the sql query dynamic with resulting data from a flux query? (if that feature is already builtin). And, can you clarify if already some key/value pair caching from mysql results is in place or planned? That’d really be a killerfeature for me and enable using as influx as some sort of “metric enrichment hub”.


@mephisto you can use Flux’s stream and table functions to extract scalar values from a stream of tables. You first use tableFind() to extract a single table from the stream of tables. You then use either getColumn() to extract an array of values in a specific column or getRecord to extract a row/record at a given index in the form of an object comprised of key-value pairs for each column.

For example, this custom function extracts the latest value for a specified field:

getFieldValue = (tables=<-, field) => {
  extract = tables
    |> tableFind(fn: (key) => key._field == field)
    |> getColumn(column: "_value")
  return extract[0]

// The function in use
fieldValue = from(bucket: "example-bucket")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "someMeasurement")
  |> getFieldValue(field: "someField")

// Use the scalar value in another function/query
  // ...
  query:“SELECT * FROM foo WHERE bar > ${string(v: fieldValue)}”