I want to create a custom datetime column in the measurement which will identify when the record is getting inserted to InfluxDB. In the time column it is recorded the time when the point get created in Kafka, so need to insert a custom datetime column which will show the insertion timestamp.
I have used the processor inside the telegraf plugin which have date and it will create the new datetime column but it will keep the same kafka point timestamp.so finally the time and insertion_time has the same value.
I don’t believe more than one timestamp column is supported by influxdb. But there’s nothing stopping you from creating your own field or index with this extra timestamp. However I don’t know how you’d get this to work with telegraf.
If you only have telegraf available. What you might be able to do is put the Kafka timestamp into the custom field or index, and then let influxdb add its own timestamp as the records are received and written. Then you’d have the delta .
Just had another idea. Your could write the data to influxdb twice - which while that isn’t very efficient - would allow you to use native influxdb time calculations to compare event times. The first measurement or bucket if you have multiple streams has the Kafka native timestamp, and the second has the influxdb write timestamp. Assuming you have enough unique information you can then compute a delta there. This solution won’t scale that well, but could be helpful as a proof of concept.
Thanks for your reply .We have already a timestamp which is already coming as part of Kafka and that one is getting loaded as Time column in Influx DB.Now i have to create a custom when the data get inserted to Influxdb it has to write the timestamp once it inserts into the DB.
See the kafka topic where the last column is the point created;
And Now the last column is loaded as Time column , that date is the Point created time.
In Telegraf Processor Plugin we have a section for Date but that is collected as the same Time column from Influx db and it is not the current timestamp it is generating in DB. I am creating a new column called Inserted_ts which ideally should hold the value of the time it inserts to DB which is not.
Let me know how can i put current timestamp into inserted_ts column when the point inserted to DB.
It looks like the processors.date plugin just takes the timestamp from the metric, converts it to another format and writes it back in an additional field or tag. Therefore it always comes out with the same timestamp that is already in the metric.
I just don’t know which other plugin could bring the desired result out of the box.
At first I thought that it should be feasible with a processors.starlark plugin, but as far as I know, starlark does not have access to any standard libraries. This does not work:
[[processors.starlark]]
source = '''
def apply(metric):
metric.fields["inserted_ts"] = time.now() # DOES NOT WORK
return metric
'''
The only option I can think of at the moment is an processors.execd plugin, with that one would have all possibilities and could easily insert a new field with a current time stamp.
BTW: Please do not post images of code snippets, instead please use markdown syntax, so that code snippets are properly formatted in the forum:
```log
put you influx line protocol snippet here
```
```toml
put you config code snippet here
```
Here is the python code for the processors.execd plugin in the file pyaddtime.py:
from datetime import datetime
from influxdb_client import Point
from line_protocol_parser import parse_line
while True:
try:
input_line = input() # read from stdin
except EOFError: # catch EOF error
continue
else:
now = datetime.now() # current timestamp
date_time = now.strftime('%Y%m%d%H%M%S') # to string
data = parse_line(input_line) # parse input line
data['fields']['inserted_ts'] = date_time # add timestamp to dict
point = Point.from_dict(data) # new metric from dict
print(point.to_line_protocol()) # write to stdout
Here is the config snippet for the execd plugin for the telegraf.conf file:
If you’re willing/able to reverse the purposes of those columns, then what you’re trying to do can be done basically for free. Below is a snippet from the documentation - key section in bold. In your lineprotocol record, drop the timestamp at the very end of the record and send it into influxdb and then look at the time the system gave it. That’s going to be the most accurate “write timestamp” that I know of.
When writing data to InfluxDB, we recommend including timestamp with each point. If a data point does not include a timestamp when it is received by the database, InfluxDB uses the current system time (UTC) of its host machine.
The default precision for timestamps is in nanoseconds.
This is not going to work as I have stated we are keeping the epoch in each of the point as that shows the point generation which is going as Time column in DB. Now we need to get a column as insert-ts which will hold the timestamp when that point insert into DB.I have given the example of data and the processor plugin i am generating to create a new column
How many data points per second are we talking about?
I have not analyzed the performance of the script. However, this should not be a problem. The execd plugin runs as deamon after all.
With a compiled execd plugin (e.g. written in Go) the performance should be a bit better.
Or you can write your own native Telegraf inputs or processors plugin, which is then compiled into Telegraf itself. This should have the best performance.
I also wrote the processors.execd plugin in Go just for fun:
package main
import (
"fmt"
"os"
"time"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
)
func main() {
parser := influx.NewStreamParser(os.Stdin)
serializer, _ := serializers.NewInfluxSerializer()
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
return // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
now := time.Now() // now timestamp
original := metric.Time() // get original timestamp
metric.AddField("original_ts", original.UnixNano())
metric.AddField("inserted_ts", now.UnixNano())
metric.SetTime(now) // replace timestamp
b, err := serializer.Serialize(metric)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
}
}
Just build the Go file, copy the executable to the Telegraf folder and add the config.
This is the config snippet for the execd plugin for the telegraf.conf file: