Combining Data from Multiple Databases in InfluxDB 2 using FLUX

Hello InfluxDB Community,

I hope this message finds you well. I’m new to InfluxDB 2 and the FLUX language, and I have encountered a challenge that I would greatly appreciate your expertise and guidance with.

Context: Tracking Sales and Shipping Data

In my project, I have two buckets:

  1. Database 1 (db1): Contains quantity data, which represents the total quantity of products available for sale at specific timestamps. Each timestamp has a corresponding quantity value.
  2. Database 2 (db2): Contains transfers data, which represents the amount of product units transferred from one location to another. Each transfer has a timestamp associated with it.

Objective: I would like to create a final table that combines the data from db1 and db2 to provide a comprehensive view of quantity and transfers. The resulting table should have the following structure:

  • Row: Each unique timestamp across both databases.
  • Columns:
    • “quantity”: Represents the total stock quantity available at each timestamp.
    • “first_quantity”: Represents the initial stock quantity at the first available timestamp.
    • “transfer_amount”: Represents the number of product units transferred at each timestamp.
    • “cumulative_transfer_amount”: Represents the cumulative total of product units transferred up to each timestamp.
    • “change_pct”: Represents quantity/(first_quantity + cumulative_transfer_amount) - 1 for each timestamp.

To illustrate with an example, let’s consider the following sample data:

Database 1 (db1):

Timestamp Quantity
2023-06-01T08:00Z 100
2023-06-01T10:00Z 80
2023-06-01T12:00Z 120

Database 2 (db2):

Timestamp Transfer Amount
2023-06-01T10:00Z 5
2023-06-01T12:00Z 8

Desired Result:

Timestamp Quantity First_Quantity Transfer Amount Cumulative Transfer Amount change_pct
2023-06-01T08:00Z 100 100 0 0 0.00
2023-06-02T10:00Z 80 100 5 5 -0.24
2023-06-03T12:00Z 120 100 8 13 0.06

My Question:

Given the scenario described above and the desired final table structure, how can I efficiently combine the quantity data from db1 and the transfers data from db2 in InfluxDB 2 using the FLUX language? Specifically, what FLUX queries or techniques should I employ to achieve the desired result?

I would greatly appreciate any insights, guidance, or sample FLUX code snippets that you can provide to help me accomplish this task. :pray:

@Anaisdg , @scott tagging for attention, any guidance would be much appreciated :pray:

@ajetsharwin In the example below, I use the sample data you provided to generate the desired results. I use array.from() to build ad hoc streams of tables using the Flux data model, but you can replace these variables using from() |> range() |> filter() calls to query actual data from your buckets. I’ll include an example below.

The comments in the codeblock explain what’s happening:

import "array"

// Build sample data for db1
db1 =
    array.from(
        rows: [
            {_time: 2023-06-01T08:00:00Z, _field: "Quantity", _value: 100},
            {_time: 2023-06-01T10:00:00Z, _field: "Quantity", _value: 80},
            {_time: 2023-06-01T12:00:00Z, _field: "Quantity", _value: 120},
        ],
    )

// Build sample data for db2
db2 =
    array.from(
        rows: [
            {_time: 2023-06-01T10:00:00Z, _field: "Transfer Amount", _value: 5},
            {_time: 2023-06-01T12:00:00Z, _field: "Transfer Amount", _value: 8},
        ],
    )

// Get the first quantity value from db1
firstQty = (db1 |> first() |> findRecord(fn: (key) => true, idx: 0))._value

// 1. Union the two streams together and pivot on _time
// 2. Site by time ascending
// 3. Fill null values resulting from the pivot with 0
// 4. Duplicate the "Transfer Amount" column and then calculate the cumulative sum
//    of the duplicate column.
// 5. Iterate over each row and add the First_Quantity and change_pct columns
//    based on existing columns and the first quantity from db1
union(tables: [db1, db2])
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> sort(columns: ["_time"])
    |> fill(column: "Transfer Amount", value: 0)
    |> duplicate(column: "Transfer Amount", as: "Cumulative Transfer Amount")
    |> cumulativeSum(columns: ["Cumulative Transfer Amount"])
    |> map(
        fn: (r) =>
            ({r with First_Quantity: firstQty,
                change_pct: float(v: r.Quantity) / float(v: firstQty + r["Cumulative Transfer Amount"]) - 1.0,
            }),
    )

This returns:

_time Quantity First_Quantity Transfer Amount Cumulative Transfer Amount change_pct
2023-06-01T08:00:00Z 100 100 0 0 0
2023-06-01T10:00:00Z 80 100 5 5 -0.23809523809523814
2023-06-01T12:00:00Z 120 100 8 13 0.06194690265486735

To query your actual data in InfluxDB, it would look something like this:

db1 =
    from(bucket: "db1")
        |> range(start: 2023-06-01T08:00:00Z, stop: now())
        |> filter(fn: (r) => r._measurment == "example-measurement1")
        |> filter(fn: (r) => r._field == "Quantity")

db2 =
    from(bucket: "db2")
        |> range(start: 2023-06-01T08:00:00Z, stop: now())
        |> filter(fn: (r) => r._measurment == "example-measurement2")
        |> filter(fn: (r) => r._field == "Transfer Amount")

firstQty = (db1 |> first() |> findRecord(fn: (key) => true, idx: 0))._value

union(tables: [db1, db2])
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> sort(columns: ["_time"])
    |> fill(column: "Transfer Amount", value: 0)
    |> duplicate(column: "Transfer Amount", as: "Cumulative Transfer Amount")
    |> cumulativeSum(columns: ["Cumulative Transfer Amount"])
    |> map(
        fn: (r) =>
            ({r with First_Quantity: firstQty,
                change_pct: float(v: r.Quantity) / float(v: firstQty + r["Cumulative Transfer Amount"]) - 1.0,
            }),
    )
1 Like

@scott Thank you very much for all of the above and the comments in the codeblock.

I have some follow-ups to better learn about the FLUX language and the approach you’ve chosen:

  1. I did not know we can write (key) => true. Can you please shed some light on the syntax here? What are spectrum of parameters we can pass into the fn: function? Is this (key) => true shown anywhere on the docs?

  2. from what I gather, when we want to create new columns in a table, we either do it through a
    “duplicate + apply a function” method or using the map function and creating new columns. Can you please shed some light on the methods we can use to generally go about adding new columns and which methods are best suited for different instances?

  3. Was there any particular reason you didn’t use left join here?

I was expecting logic/methodology along the below lines: I’m simply writing the above in a readable way, it’s not strictly following a query language.

db1 = “select time,quantity from XXX”
db2 = “select time,transfer_amount from YYY”
db3 = “select time, quantity, transfer_amount from XXX left join YYY on XXX.time = YYY.time”
db4 = db3 + operations to add first_quantity, cumulative transfer_amount and change_pct

Thank you

The findRecord() function’s fn parameter is used to identify what table to extract from the stream of tables (there are many). Each table is identified by a group key. The fn parameter is a function that evaluates the group key of each table and returns the first table that evaluates to true. So the syntax (key) => true just returns the first table in the stream of tables. A slightly more advanced example is provided in the findRecord() documentation.

Correct. I would also add set() to the list of options for adding a new column. set() and duplicate() are “lighter” (less compute-heavy) methods of adding a new column that don’t require iteration over every single row. Map iterates over each row and lets you uniquely define the value of the added column, but it is a more compute-heavy operation. So if I were to define some rules:

  • If creating a new column with a single, static value, use set().
  • If creating a new column based on the value of an existing column, use duplicate().
  • If creating a new column where each the value needs to be dynamically set per row, use map().

Here’s some more info.

Joins are notoriously compute-heavy operations. They have to evaluate each row on each side of the join to see what rows should actually be joined together. I’ve found that in many cases, joins can be avoided altogether by using a combination of union() and pivot(). union() just merges multiple streams of tables into a single stream. Then, when you pivot on _time, all the data associated with each unique timestamp is gathered into a single row. This is often the desired result of a join, but done in a much more performant way.

1 Like

Thank you Scott! noted on all that you’ve written above.

Hey @scott, having recently come to terms with

  1. InfluxQL queries being faster than Flux
  2. lack of Flux support in Influx 3

One of the primary motivations to move to Influx2 and use Flux is to allow data manipulation from different databases; and your solution above makes a lot of sense and works well on Flux. However, if the above can be translated into InfluxQL, the preference would be to stick with InfluxQL. Do you have any guidance on whether the above query can be written in InfluxQL please?

cc: @Jay_Clifford , @Anaisdg , @grant1

@ajetsharwin Unfortunately, this is not possible with InfluxQL (but may be with SQL in v3, I need to test).