Retrieve data in insertion order, not timestamp order

I want to store data in both InfluxDB V1 and V2 in a way that I can retrieve the data in insertion order over a series of successive calls. My timestamps are not reliably increasing, so if I query in timestamp order I will miss data from one call to the next.

I have a working system, but it is very slow and CPU-intensive and I’m hoping there is a better way to approach it.

When writing to the database, I send pointname, timestamp, value, quality and index. Pointname is a tag, and quality and index are two numeric fields. Quality is just a number, and index is a monotonically increasing number - think of it as a row identifier. I can keep track of the last read index in my program, so I can read (for example) 1000 values by filtering for index from 0 to 999, then on a subsequent read from 1000 to 1999 and so on. This works perfectly until the database gets even a little bit large. For InfluxDB V1, it starts to take several seconds when the database has more than 10 million values, and for InfluxDB V2 the same happens around 40 million values.

My InfluxDB V1 query looks like this:

SELECT count("value") AS "mean_value" FROM "DataHub"."Desktop"."Simulated" where index >= 0 and index < 1000 GROUP BY "pointname"

Performance on this is awful, unsurprisingly, since I’m filtering with in inequality on a field. Is there a better way to do this in InfluxDB V1?


For InfluxDB V2, I do not know how to make this query. I have a hack that does not handle out-of-order timestamps, where I first determine the time range for the index range, then query for the data. I did this because I could not find a way to produce rows of output that contained pointname, value, quality and timestamp for each row of output while also filtering for the index range.

data1 = from (bucket: "Desktop")
    |> range(start: 0, stop: 2200-01-01)
    |> filter(fn: (r) => r._measurement == "Simulated")
    |> filter(fn: (r) => r._field == "index" and r._value < 1000 and r._value >= 0)
    |> keep(columns: ["_field", "_value", "_time"])

from data1
    |> min(column: "_value")
    |> keep(columns: ["_time"])
    |> map(fn: (r) => ({ r with timestamp: uint(v: r._time) }))
    |> yield(name: "data2")

from data1
    |> max(column: "_value")
    |> keep(columns: ["_time"])
    |> map(fn: (r) => ({ r with timestamp: uint(v: r._time) }))
    |> yield(name: "data3")

then

from (bucket: "Desktop")
    |> range(start: data2, stop: data3)
    |> filter(fn: (r) => r._measurement == "Simulated" and r._field =~ /index|quality|value/)
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({ r with time: uint(v: r._time) }))
    |> keep (columns: ["time", "index", "quality", "value", "pointname"])

The queries to find the start and end timestamps take most of the time (about 80%), and most of that time is spent in the filter for index range. To top it off, this isn’t even correct. Ideally I want what I get with V1, where each row contains name, value, quality and timestamp for any record where the index is within the range (0 to 1000 in this example). My query as it stands will fail for data that arrives out of time order by potentially reading the same values more than once.

Is there a way to make the query a) correct in that it returns the data in insertion order, and b) more efficient?

I can freely modify how the data is inserted into InfluxDB - is there a better way to represent it?

Hello @asthomas,
If you can’t rely on your timestamps why not generate new timestamps instead of a field with an timestamp index?

To translate this query,

SELECT count("value") AS "mean_value" FROM "DataHub"."Desktop"."Simulated" where index >= 0 and index < 1000 GROUP BY "pointname"

I would do:

import "influxdata/influxdb/schema"

from (bucket: "Desktop")
    |> range(start: 0, stop: 2200-01-01)
    |> filter(fn: (r) => r._measurement == "Simulated")
    |> filter(fn: (r) => r._field == "index"  and r._field == "index")
    |> schema.fieldsAsCol()
    |>   filter(fn: (r) =>  r.index < 1000 and r.index >= 0)

Tags are automatically part of the group key.

Hi @Anaisdg. Thank you for the suggestions. It is not that I cannot rely on the timestamps - the timestamps are correct. It is just that the timestamps are not monotonically increasing, so if a value is inserted with a timestamp in the past, it will be missed if I use the final timestamp of the previous read as the starting point of the next read. I need to preserve the out-of-order timestamps, yet read the data in insertion order. If it helps, my application is a historical playback mechanism for process data, and the out-of-order timestamps are important. The only way I could think of to preserve insertion order was with a monotonically increasing record ID. If there is a better way, I am open to suggestions.

I tried the query you suggested on my test database with 40 million records. InfluxDB consumed all available memory and then crashed. I think that schema.fieldsAsCols is creating a complete table of all 40 million records before it extracts the 1000 records matching the index filter. I guess the index filter needs to be first. However, if I filter with

filter(fn: (r) => r._field == "index"  and r._value < 1000 and r._value >= 0)

then I do not know how to include the quality field in the eventual output.

My end goal is to efficiently produce a tuple of (pointname, value, quality, timestamp) for every record, regardless of timestamp, inserted since the last query, up to some maximum number of records.

The best I have come up with so far is this:

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

indices = from (bucket: "Desktop")
    |> range(start: 0)
    |> filter(fn: (r) => r._measurement == "Simulated" and r._field == "index" and r._value >= 0 and r._value < 1000)
    |> keep(columns: ["fullname", "_time", "_value"])
    |> group()

// Returns a array with the first time as the first element and the
// last time as the second element.  Based on advice from @scott
times = union(tables:[
        indices |> min(column: "_time"),
        indices |> max(column: "_time"),
    ]) |> findColumn(fn: (key) => true, column: "_time")

// Use an array reference to reference the timestamps
rstart = times[0]
rstop = times[1]

values = from (bucket: "Desktop")
    |> range(start: rstart, stop: rstop)
    |> filter(fn: (r) => r._measurement == "Simulated" and r._field == "value")
    |> group()

qualities = from (bucket: "Desktop")
    |> range(start: rstart, stop: rstop)
    |> filter(fn: (r) => r._measurement == "Simulated" and r._field == "quality")
    |> group()

firstjoin = join(
    tables: {i:indices, v:values}, 
    on: ["_time", "fullname"]
)

join(
    tables: {f:firstjoin,q:qualities},
    on: ["_time", "fullname"]
)
    |> group()
    |> map(fn: (r) => ({r with value: r._value_v}))
    |> map(fn: (r) => ({r with index: r._value_i}))
    |> map(fn: (r) => ({r with time: uint(v: r._time)}))
    |> map(fn: (r) => ({r with quality: r._value}))
    |> keep(columns: ["fullname", "time", "quality", "value", "index"])

It computes the time range of the records matching the index range, then queries and performs two joins, once for the value and once for the quality. When I profile this, I see two strange things:

  1. both findRecord calls use the same amount of time as the first filter, which is scanning the entire database, even though findRecord is being applied to a table with only one record.
  2. The first join takes the same amount of time as scanning the entire database, even though it is being applied to a table that is 1/40000 the size of the database.
1 Like

It turns out that this query does not scale. If I change it to retrieve 10000 values instead of 1000 then the two join calls at the end swamp all other processing. On my system the query for 1000 values takes about 8 seconds, but the same query for 10000 values takes 56 seconds, with all the additional time in the join calls. Plotting the time for the joins relative to the number of values, it appears that join is not linear, with CPU load rising faster than the count.

Is there a way to eliminate the join calls and make this query efficient? InfluxDB V1 is less performant than InfluxDBV 2 at searching for the matching records based on index, but overwhelmingly more performant when viewing the solution as a whole.

If there is another way to formulate this problem so InfluxDB V2 performs more like V1?

1 Like

Please, I need some help here. I’m writing a book in V2 to try to reproduce a one-liner in V1, and my best effort is unusable due to scaling issues. How am I supposed to do this with V2?

Anybody? Is this query just not expressible in Flux?

This problem has not gone away. More and more customers are asking for InfluxDB V2, but unless we can find a solution to this question we cannot do it. Summarizing the above, how do I express this InfluxDB V1 query efficiently in InfluxDB V2?

SELECT pointname, quality, value, index FROM “DataHub”.“Desktop”.“Simulated” where index >= startindex and index < stopindex

Or, if that is not possible, then something like this (I know this will not work since ‘order by’ will not order by a field, but you get the point):

SELECT pointname, quality, value, index FROM “DataHub”.“Desktop”.“Simulated” where index >= startindex order by index asc limit 1000

where pointname is a tag, quality and value are non-unique fields, and index is a unique integer field.