Replication of Non frequent data change

Dear community,

I am actively using OPC UA input listener plugin (suscription) for ingesting data into influxdb. Suscription means that I only ingest data when data changes. The problem I encountered is that some points are changing at ms rate like speed or current, nevertheless, some other points are almost static like PLC firmware. What would be the best way to replicate all measurements at 12:00am for example so I could reduce the timeframe of my queries and also avoid loosing data if the retention policy of the bucket is reached? I was thinking in a couple of options: telegraf restart every midnight (which is not the most best way because possible lack of data) or a kind of task running into the db every midnight at 00:00, looking for the last value of each measurement and create a new one with the same value at 00:00. What do you think?

Thanks in advance

I’m facing a similar issue and decided to stick with the basic interval read plug in for now. I don’t know how feasible it is from a configuration standpoint, but you could split the nodes into static and dynamic categories? Put all fast changing nodes (sensors, etc.) in the subscription plugin and the mostly static nodes in the basic read plugin with a long interval like 1h?

Hello. Thanks for your reply.
The solution you propose could be an option but due to the fact that maybe some signals remain inactive for days and during certain operation start to change frequently it is very difficult to define static and dynamic variables to be managed with different input plugins.
I would have preference for the auto-generated measurements every midnight but to be honest I don’t know exactly how could be implemented when speaking about thousands of measurements…

In any case thanks for your comment.

I hope someone could give us some brightness.

So I still think this should be possible only using Telegraf. You are allowed to add the same node to both the inputs.opcua and inputs.opcua_listener plugin. A metric with a fixed interval would come from inputs.opcua while fast changes are captured by inputs.opcua_listener. I should mention that everyday at midnight is not trivial to configure, but 24h itself is a valid interval. Would this work for you?

1 Like

Hello,

I have encountered the same issue. last few months my configuration is split into inputs.opcua_listener and inputs.opcua like this thread suggest.

I just discovered the starlark aggregator thanks to @R290 in another topic.
This aggregator is called periodically and has the ability to store data.
Just need to writeup the functions, but the first tests on how the aggregator works seem promising. The add function gives you the possibility to refresh your stored metric you want to replicate and the push function is able to publish all stored metrics every period and by not resetting the state, I am able to keep all the data for the next period.

A thing I will take into account is the metric quality, if the connection is down it should not replicate the last known data. Saying this makes me realize it might not matter as long as it replicates a bad state until it is good again.

Any other edge cases anyone can think of?
Thoughts on using this aggregator?

[[aggregators.starlark]]
  period = "30s"
  source = '''
state = {}
temp = 0
def add(metric):
  #storage function

def push():
  #publish all stored metrics

def reset():
  # Do something useless to not reset state
  temp = 0
'''

This script seems to do the job.

Just add to the configuration for the metrics you want to replicate.

EDIT: added the field to de groupID function to create a unique entry for all fields (subscription based plugins update 1 field at a time). for opcua exclude the field “Quality”, it is always included with the other fields but also shows up on its own.

[[aggregators.starlark]]
  period = "1m"
  script = "./replication.star"

replication.star:

state = {}

load("logging.star", "log")
load("time.star", "time")

def add(metric):
  gId = groupID(metric)
  state[gId] = deepcopy(metric)
  log.debug("add/update groupID: " + str(gId))
  
def push():
  log.debug("push: " + str(len(state)) + " metrics")
  metrics = []
  for v in state.values():
    storedmetric = deepcopy(v)
    storedmetric.time = time.now().unix_nano
    metrics.append(storedmetric)
  return metrics

def reset():
  log.debug("reset replication function")
  
def groupID(metric):
  key = metric.name
  for k, v in metric.tags.items():
    key = key + "|" + k + "-" + v
  for k, v in metric.fields.items():
    key = key + "|" + k
  return hash(key)

When te plugin fails to connect or disconnects, there is no way of knowing based on the quality.
And you don’t want to replicate data when connection fails.

The only thing I can think of is creating a variable that is has a fixed change interval and implement a timeout in the aggregator based on that. This way you can stop the replication on communication loss.

This brings me back to an old issue of mine: [inputs.opcua] Quality should be a tag and not a field · Issue #9405 · influxdata/telegraf · GitHub

Updated the script today. I can now link each metric to a watchdog metric that is updated at a constant rate. If the watchdog has not been updated, all other replication is halted, but still kept in memory. When the watchdog resumes the old values are replicated again, or they are updated immediately because of the reconnection.
This way you cover communication interruption and plc running state.

Quality can be a tag indeed. Still wouldn’t send it to influxdb because changing tags in time is a nightmare on its own (messes up all grouping).

@JeroenVH thanks for this! I appreciate your help here.

Update:

I am in the process of creating a PR for my example (replication.star), however today I had a new idea to reduce the complexity by a lot. But you can already give it a read or a try.

Currently I did some monitoring of the connection with a watchdog metric, but any metric will suffice to indicate the connection state you just need one tag to create some kind of group to keep track of the last update in the group.
And to be sure you can then add a metric that has a know update interval to the datasource or you can add a metric to the group that is being polled at a fixed interval by a second connection. But when I type the words “second connection”, I already think this is less reliable because it does not say anything about the first connection being live. But hey the options are there.

Will update the script in the next week.

1 Like

@JeroenVH
Thanks for your help here and sharing your solution for data continuity, loss issues and quality preservation. What are you using influxdb for?

@Anaisdg
We use telegraf, influxdb and grafana to visualize the analog measurements and valve/pump states of our process PLC’s. This makes the data accessible to people that do not use the SCADA system.

We are also working on integrating the Grafana webpage into our SCADA to replace the ancient historian.