Event duration calculation

Dear experts,

I am quite new in InfluxDB. I need your expertise for the following.

At present, I am in process of designing an Industry 4.0 solution.

We store some data in InfluxDB v2.7 database by using a gateway. The data we store in the measurements, got the following structure:
measurement_name, tag1=”Machine Name”, tag2=”Item Name”, tag3=”Type Name” field1=2 field2=32 unix_timestamp_1
measurement_name, tag1=”Machine Name”, tag2=”Item Name”, tag3=”Type Name” field1=2 field2=32 unix_timestamp_2
measurement_name, tag1=”Machine Name”, tag2=”Item Name”, tag3=”Type Name” field1=2 field2=32 unix_timestamp_3
measurement_name, tag1=”Machine Name”, tag2=”Item Name”, tag3=”Type Name” field1=2 field2=32 unix_timestamp_4
, etc.

These records represent states changing of certain devices – motors, valves etc.
The Unix timestamp is the moment of status change. The tag2 – Items, is a device identifier.
Let say that into the measurement, I have got additional field – “duration”.
Is there any trick, where the InfluxDB, automatically calculates the status duration, by subtracting from the current timestamp, the timestamp of the previous status change, of the same item?

If it was MS SQL, I would do it into the query itself! I would use subqueries or Common Table Expression. First I would select the data by ”Item Name” and timestamp – start and end time, next I would order then by timestamp and at the end I would use “window” function to access the previous row in order to subtract the time.

Probably any functionality as triggers is available?

Any advices regarding the design of such functionality will be highly appreciated!

Best Regards,
Svetozar

Welcome @yolovs to the InfluxDB forum.

Which version of InfluxDB are you using? If using InfluxDB Cloud v3, you can use SQL (if that is what you are comfortable with).

Here are some examples for calculating the elapsed time:

InfluxDB does not have native “triggers” like relational databases, but you can achieve your goal by leveraging Flux, InfluxDB’s query language, or by preprocessing data before insertion.

Calculate Duration with Flux
Flux provides the ability to perform operations similar to window functions. Here’s how you can compute the duration:

Query to Calculate Duration:
from(bucket: “your_bucket”)
|> range(start: -7d) // Adjust the range as needed
|> filter(fn: (r) => r._measurement == “measurement_name”)
|> filter(fn: (r) => r.tag2 == “Your_Item_Name”) // Filter by the specific Item Name
|> sort(columns: [“_time”], desc: false) // Ensure the data is ordered by timestamp
|> difference(column: “_time”, unit: 1s) // Calculate the difference between timestamps

The difference() function calculates the difference between successive rows based on the _time column, returning the duration in seconds or another unit if specified.
This query computes the durations dynamically without storing them in the database.

Storing Duration in the Database
If you want to store the duration directly in the database:

Preprocess Data Before Writing: Use a script (e.g., Python, Node.js, or a similar tool) to calculate the duration before writing the data to InfluxDB. Here’s an example using Python:
from influxdb_client import InfluxDBClient, Point
import datetime

Initialize client

client = InfluxDBClient(url=“http:// localhost:8086”, token=“your_token”, org=“your_org”)
write_api = client.write_api()

Sample data

data = [
{“tag2”: “Item1”, “timestamp”: 1640995200},
{“tag2”: “Item1”, “timestamp”: 1640998800},
{“tag2”: “Item1”, “timestamp”: 1641002400},
]

previous_timestamp = None
for record in data:
if previous_timestamp:
duration = record[“timestamp”] - previous_timestamp
point = (
Point(“measurement_name”)
.tag(“tag2”, record[“tag2”])
.field(“duration”, duration)
.time(datetime.datetime.fromtimestamp(record[“timestamp”]))
)
write_api.write(bucket=“your_bucket”, record=point)
previous_timestamp = record[“timestamp”]

Post-Processing with a Task in InfluxDB: InfluxDB tasks can periodically run a query to calculate and store the duration back into the database:
option task = {name: “Calculate Duration”, every: 1h}

data = from(bucket: “your_bucket”)
|> range(start: -1h) // Adjust the range for the task
|> filter(fn: (r) => r._measurement == “measurement_name”)
|> sort(columns: [“_time”], desc: false)
|> difference(column: “_time”, unit: 1s)

data
|> to(bucket: “your_bucket”, org: “your_org”) // Write back the calculated duration