Having a stream input with Telegraf

Hi,

I have something writing data on serial port (/dev/serial0) every second.
I have a shell script able to read serial port line by line and convert data to telegraf/influxdb input format.

What do I do from now ? Can I use the “tail” input plugin with this ? how ? (I don’t want to write all datas in a file to be able to read them with the tail plugin, I’d like a real stream).

Extremely easy way is to install Node-Red and use the node-red-serialport and node-red-contrib-influxdb additions to wire it all up. No coding required.

Does it mean it wouldn’t be easy with Telegraf ? or not even possible ?

I’m on a constraint device where I’d like to install the least amount of tools possible, so Telegraf + InfluxDB for local storage is ok, but no more would be nice.

Btw, even if I used node-red, I’m not writing only to local InfluxDB, I’m also writing to some remote MQTT broker, I know it’s possible to do this with Node-RED but I know Telegraf has some good MQTT output plugin too, so if all was possible with Telegraf, it would be nice.

I suggest having your script send to a socket_listener input.

2 Likes

Ok, thanks !

I didn’t know much about sockets, but it looks totally feasible in pure shell.

So I can remember tomorrow how to do it :

Hi,
I know it’s a little bit old topic, but as I ended up here, some other might.
Another (easier?) solution might also be to use the inputs.tail plugin.
Let your script write its output to a file, and configure Telegraf so it reads its content.

Here is a MWE, where Telegraf reads a file called /tmp/metrics.input as text value, and sends it to the standard output in Influx Line Protocol format.

The curated Telegraf configuration file telegraf.conf:

# Send telegraf metrics to file(s)
[[outputs.file]]
  ## Files to write to, "stdout" is a specially handled file.
  files = ["stdout", "/tmp/metrics.out"]

  ## Data format to output.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
  data_format = "influx"


# Parse the new lines appended to a file
[[inputs.tail]]
  ## File names or a pattern to tail.
  ## These accept standard unix glob matching rules, but with the addition of
  ## ** as a "super asterisk". ie:
  ##   "/var/log/**.log"  -> recursively find all .log files in /var/log
  ##   "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
  ##   "/var/log/apache.log" -> just tail the apache log file
  ##   "/var/log/log[!1-2]*  -> tail files without 1-2
  ##   "/var/log/log[^1-2]*  -> identical behavior as above
  ## See https://github.com/gobwas/glob for more examples
  ##
  # files = ["/var/mymetrics.out"]
  files = ["/tmp/metrics.input"]

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  # data_format = "influx"
  data_format = "value"
  data_type = "string"

To run telegraf:

telegraf --config telegraf.conf

To write into the file manually:

echo "Hello, World!" >> /tmp/metrics.input

And here is a Bash script demo:

FILE_OUTPUT=/tmp/metrics.input

for fileNbr in {1..5}
do
    # Erase the content of the file
    echo -n "" > ${FILE_OUTPUT}

    # Write a message every second for 10 seconds
    for messageNbr in {1..10}
    do
        echo "message_${fileNbr}_${messageNbr}" >> ${FILE_OUTPUT}
        sleep 1
    done
done

The output should look like this:

2022-06-15T16:43:48Z I! Starting Telegraf 1.22.4
2022-06-15T16:43:48Z I! Loaded inputs: tail
2022-06-15T16:43:48Z I! Loaded aggregators:
2022-06-15T16:43:48Z I! Loaded processors: 
2022-06-15T16:43:48Z I! Loaded outputs: file
2022-06-15T16:43:48Z I! Tags enabled: host=MyHost
2022-06-15T16:43:48Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"MyHost", Flush Interval:10s
tail,host=MyHost,path=/tmp/metrics.input value="Hello, World!" 1655311450007543601
tail,host=MyHost,path=/tmp/metrics.input value="message_1_1" 1655311469560168382
tail,host=MyHost,path=/tmp/metrics.input value="message_1_2" 1655311470563713078
tail,host=MyHost,path=/tmp/metrics.input value="message_1_3" 1655311471564995974
tail,host=MyHost,path=/tmp/metrics.input value="message_1_4" 1655311472566494242
tail,host=MyHost,path=/tmp/metrics.input value="message_1_5" 1655311473567657108
tail,host=MyHost,path=/tmp/metrics.input value="message_1_6" 1655311474568881514
tail,host=MyHost,path=/tmp/metrics.input value="message_1_7" 1655311475570046833
tail,host=MyHost,path=/tmp/metrics.input value="message_1_8" 1655311476572052497
tail,host=MyHost,path=/tmp/metrics.input value="message_1_9" 1655311477579071920
tail,host=MyHost,path=/tmp/metrics.input value="message_1_10" 1655311478583538569
tail,host=MyHost,path=/tmp/metrics.input value="message_2_1" 1655311479587399531
tail,host=MyHost,path=/tmp/metrics.input value="message_2_2" 1655311480592907369
tail,host=MyHost,path=/tmp/metrics.input value="message_2_3" 1655311481599178301
tail,host=MyHost,path=/tmp/metrics.input value="message_2_4" 1655311482603748573
tail,host=MyHost,path=/tmp/metrics.input value="message_2_5" 1655311483608296926
tail,host=MyHost,path=/tmp/metrics.input value="message_2_6" 1655311484614526181
tail,host=MyHost,path=/tmp/metrics.input value="message_2_7" 1655311485619593348
tail,host=MyHost,path=/tmp/metrics.input value="message_2_8" 1655311486623221357
tail,host=MyHost,path=/tmp/metrics.input value="message_2_9" 1655311487626948353
[...]

From there, you just have to play with Telegraf’s configuration to change its tags, format, etc. to your convenience.

Hope it helps!

Hey there

I am working on an embedded sensor prototype where I would like to get data into InfluxDB over a Serial (USB) connection.
There was a feature request almost 3 years ago, but unfortunately Telegraf still can’t read from a Serial device until now.

I came up with a workaround (including a custom script) and would like to share my solution here:

Background

A sensor system which communicates via Serial connection over USB (Arduino based). Each time interval (5 seconds) the data is written to the Serial connection on a single line. The data format happens to be InfluxDB Line Protocol.

I would like to get this data into an InfluxDB V2 via Telegraf and I do have a Linux OS (Raspberry Pi) dedicated for this task.

Solution

As proposed by @daniel I have used Telegrafs socket_listener input plugin which will open a TCP server and listen for any incoming data. The corresponding Telegraf configuration for the socket listening on port 7654 is:

[[inputs.socket_listener]]
  service_address = "tcp://:7654"
  data_format = "influx"

Next, I wrote a python script that basically establishes a TCP connection to Telegraf and opens the Serial port. Then all data from the Serial port is read line by line and sent to Telegraf.
Notes:

  • Use the /dev/serial/by-id/ path to get a fixed path for the device
  • The package PySerial is required, install it via $ pip3 install pyserial
  • Interrupt handlers are added for the sake of completeness
#!/usr/bin/env python3
import serial, socket
import time, signal, sys

# Constants
serial_port = '/dev/serial/by-id/usb-SparkFun_SparkFun_Pro_Micro-if00'
server_addr = ("localhost", 7654)

# Initilize objects
tcp = socket.socket()
ser = serial.Serial()

#==================
# Interrupt handler (SIGINT & SIGTERM)
#==================
def clean_up_and_exit(exit_code):
  tcp.close()
  ser.close()
  sys.exit(exit_code)

def handler_stop_signals(signum, frame):
  print("Signal handler called with signal {} ... exiting now".format(signum))
  clean_up_and_exit(2)

signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)

#==================
# TCP Socket - setup
#==================
print("Create TCP connection to: {}".format(server_addr))
while True:
  try:
    tcp.connect(server_addr)
  except:
    time.sleep(5)
  else: # connection established successfully
    break
print("  ... connection established")

#==================
# Serial - setup
#==================
print("Try to open serial port: " + serial_port)
ser.port = serial_port
while not ser.is_open:
  try:
    ser.open()
  except:
    time.sleep(5)
print("  ... opened successfully")

#==================
# "Main" loop - shovel data from Serial to TCP Socket
#==================
while True:
  try:
    line = ser.readline()
    tcp.sendall(line)
  except:
    print("Encountered Serial or TCP error ... exiting now")
    clean_up_and_exit(1)

Save it somewhere (e.g. /usr/local/bin/my-sensor.py) and make it executable.

To run the script automatically after system boot and to restart the script when something goes wrong, systemd is the way to go. So I wrote a unit file for this script which has to go to a specific path. The file-name can be chosen but must end with .service e.g: /lib/systemd/system/my-sensor.service
Notes:

  • As I have developed the script as “user”, I let systemd execute it as “user” instead of root - make sure your user is in group dialout and has access to the serial port
  • Instead of executing the script directly, I call python3 with the -u flag for unbuffered STDOUT. Otherwise single output lines are not shown in the log - only when the STDOUT buffer is full.
[Unit]
Description=Read from Serial (USB) and send data via TCP Socket to Telegraf (InfluxDB)
After=network.target

[Service]
Type=simple
User=<your-user>
Restart=always
RestartSec=3s
ExecStart=python3 -u /usr/local/bin/my-sensor.py

[Install]
WantedBy=multi-user.target

Finally reload the systemd daemon, start this unit and check it’s status:

$ sudo systemctl daemon-reload
$ sudo systemctl start my-sensor.service
$ sudo systemctl status my-sensor.service

Once everything works as expected, do not forget to enable this unit, that it gets started automatically upon boot: $ sudo systemctl enable my-sensor.service

Final thoughts

The data sent by my sensor is already in Line Protocol format. It should be straight forward to use any other input data format in the Telegraf configuration. This script is only a “gateway” for any sort of data into Telegraf.

If Telegraf is running inside a docker container, don’t forget to expose the used port (e.g. 7654).

I have Telegraf running on the same machine, so I use “localhost” to connect from the python script to Telegraf, but you can of course also specify the IP address of another machine if Telegraf is running there.

1 Like