UDF examples for Python3

Hi, I am experimenting with UDF.

I followed the “Custom Anomaly Detection” example (Custom Anomaly Detection | InfluxData Documentation Archive) and got it working. However, I noticed that the examples are written in the old python2. I’d prefer to use a more recent python version, so I tried converting to Python3. However, I run into several problems (dependency hell, path problems, deprecated code). Until now I have not succeeded in getting things running properly under Python3, which is probably due to my lack of experience with Python. I have wasted quite a bit of time on something I assumed would be peanuts. Before I give up on using my Python frustration and move to Go, I’d like to ask you:

Did anyone successfully convert the example(s) to run under Python3?

This got me curious so I tried it out locally since I primarily use python3 as well.

Here is a quick diff for the agent.py file that worked for me.

--- a/agent.py
+++ b/agent.py
@@ -1,12 +1,17 @@
 # Kapacitor UDF Agent implementation in Python
 #
 # Requires protobuf v3
-#   pip install protobuf==3.0.0b2
+#   pip install protobuf==3.0.0
 
 import sys
-import udf_pb2
+from . import udf_pb2
 from threading import Lock, Thread
-from Queue import Queue
+
+try:
+    from queue import Queue
+except ImportError:
+    from Queue import Queue
+
 import io
 import traceback
 import socket

I haven’t tested this beyond it ensuring it could start the UDF process.

Give it a try and let me know on this PR how it goes. Thanks

2 Likes

Hi Nathiel,

Thanks for your support. I tested with the changes you described, including installing the protobuf 3.0.0. I use the Custom Anomaly Detection example as closely a base example. I had to change the python code in there a bit as well (as print >> sys.stderr is not valid in Python3). The following errors occur when trying to start kapacitor:

[task_master:main] 2017/05/04 09:51:23 I! opened
[udf] 2017/05/04 09:51:24 I!P Traceback (most recent call last):
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1082, in MergeFromString
[udf] 2017/05/04 09:51:24 I!P if self._InternalParse(serialized, 0, length) != length:
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1104, in InternalParse
[udf] 2017/05/04 09:51:24 I!P (tag_bytes, new_pos) = local_ReadTag(buffer, pos)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/decoder.py”, line 181, in ReadTag
[udf] 2017/05/04 09:51:24 I!P while six.indexbytes(buffer, pos) & 0x80:
[udf] 2017/05/04 09:51:24 I!P TypeError: unsupported operand type(s) for &: ‘str’ and ‘int’
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P During handling of the above exception, another exception occurred:
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P Traceback (most recent call last):
[udf] 2017/05/04 09:51:24 I!P File “/Volumes/UDF/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 106, in _read_loop
[udf] 2017/05/04 09:51:24 I!P request.ParseFromString(data)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/message.py”, line 185, in ParseFromString
[udf] 2017/05/04 09:51:24 I!P self.MergeFromString(serialized)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1088, in MergeFromString
[udf] 2017/05/04 09:51:24 I!P raise message_mod.DecodeError(‘Truncated message.’)
[udf] 2017/05/04 09:51:24 I!P google.protobuf.message.DecodeError: Truncated message.
[udf] 2017/05/04 09:51:24 I!P error processing request of type unknown: Truncated message.
[udf] 2017/05/04 09:51:24 I!P Exception in thread Thread-1:
[udf] 2017/05/04 09:51:24 I!P Traceback (most recent call last):
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1082, in MergeFromString
[udf] 2017/05/04 09:51:24 I!P if self._InternalParse(serialized, 0, length) != length:
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1104, in InternalParse
[udf] 2017/05/04 09:51:24 I!P (tag_bytes, new_pos) = local_ReadTag(buffer, pos)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/decoder.py”, line 181, in ReadTag
[udf] 2017/05/04 09:51:24 I!P while six.indexbytes(buffer, pos) & 0x80:
[udf] 2017/05/04 09:51:24 I!P TypeError: unsupported operand type(s) for &: ‘str’ and ‘int’
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P During handling of the above exception, another exception occurred:
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P Traceback (most recent call last):
[udf] 2017/05/04 09:51:24 I!P File “/Volumes/UDF/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 106, in _read_loop
[udf] 2017/05/04 09:51:24 I!P request.ParseFromString(data)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/message.py”, line 185, in ParseFromString
[udf] 2017/05/04 09:51:24 I!P self.MergeFromString(serialized)
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/google/protobuf/internal/python_message.py”, line 1088, in MergeFromString
[udf] 2017/05/04 09:51:24 I!P raise message_mod.DecodeError(‘Truncated message.’)
[udf] 2017/05/04 09:51:24 I!P google.protobuf.message.DecodeError: Truncated message.
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P During handling of the above exception, another exception occurred:
[udf] 2017/05/04 09:51:24 I!P
[udf] 2017/05/04 09:51:24 I!P Traceback (most recent call last):
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py”, line 914, in _bootstrap_inner
[udf] 2017/05/04 09:51:24 I!P self.run()
[udf] 2017/05/04 09:51:24 I!P File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py”, line 862, in run
[udf] 2017/05/04 09:51:24 I!P self._target(*self._args, **self._kwargs)
[udf] 2017/05/04 09:51:24 I!P File “/Volumes/UDF/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 142, in _read_loop
[udf] 2017/05/04 09:51:24 I!P self.write_response(response)
[udf] 2017/05/04 09:51:24 I!P File “/Volumes/UDF/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 91, in write_response
[udf] 2017/05/04 09:51:24 I!P self._out.write(data)
[udf] 2017/05/04 09:51:24 I!P TypeError: write() argument must be str, not bytes
[udf] 2017/05/04 09:51:24 I!P

I am currently already moving my attention to Go, as I dislike wasting time on the Python (dependency) hell. I do wish the examples on kapacitor were (also) capable of running in Python3 though. If you have any more suggestions, I will try to find the time to check them out. If you were to get the Custom Anomaly example running under Python3 that would be fantastic. Thanks again for helping out!

Thanks for the testing! I’ll check out the Custom Anomaly example, FWIW these kapacitor/udf/agent/examples at master · influxdata/kapacitor · GitHub examples all work for Python 2 and 3.

Hi Nathaniel,

I tried with one of the examples you pointed at, but get the same (protobuf related) errors when starting kapacitor. I made the changes to agent.py as you provided earlier. I also tried reinstalling protobuf and python, but keep getting the same results (see dump in previous post). If it is working on your machine (did you try?), then it must be something Python (installation) related on my local machine. :frowning:

@HansKe, I can totally reproduce your issue. I also used the examples from the referenced repo from @nathaniel. I run into the same problems with protobuf and TypeError conversion.

I am using the docker-compose influx stack from influxdata and just installed python3 along with protobuf 3.0.0 into the kapacitor-image.

For wat it is worth: I have never have been able to solve the Python issues. I gave up (quite quickly) and moved on to using Go, which was more successful (and a lot more fun than Python).

@nathaniel
Hi, Nathaniel! Thank you all for incredible product of InfluxData!

I am a newbie of Influx Stacks.
I faced a similar problem to @HansKe .
My envs are similar to @theo that using docker-compose consists influxdb:latest, telegraf:latest, and own build kapacitor with python3.
I have put protobuf3.0.0 on Kapacitor-image.

[srv] 2018/06/06 09:48:14 D! opening service: *udf.Service
[udf] 2018/06/06 09:48:14 I!P 2018-06-06 09:48:14,921 INFO:root: Starting Agent
[udf] 2018/06/06 09:48:14 I!P Traceback (most recent call last):
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/src/kapacitor/udf/agent/py/kapa
citor/udf/agent.py”, line 107, in _read_loop
[udf] 2018/06/06 09:48:14 I!P request.ParseFromString(data)
[udf] 2018/06/06 09:48:14 I!P TypeError: a bytes-like object is required, not ‘s
tr’
[udf] 2018/06/06 09:48:14 I!P 2018-06-06 09:48:14,922 ERROR:root: error processi
ng request of type unknown: a bytes-like object is required, not ‘str’
[udf] 2018/06/06 09:48:14 I!P Exception in thread Thread-1:
[udf] 2018/06/06 09:48:14 I!P Traceback (most recent call last):
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/src/kapacitor/udf/agent/py/kapa
citor/udf/agent.py”, line 107, in _read_loop
[udf] 2018/06/06 09:48:14 I!P request.ParseFromString(data)
[udf] 2018/06/06 09:48:14 I!P TypeError: a bytes-like object is required, not ‘s
tr’
[udf] 2018/06/06 09:48:14 I!P
[udf] 2018/06/06 09:48:14 I!P During handling of the above exception, another ex
ception occurred:
[udf] 2018/06/06 09:48:14 I!P
[udf] 2018/06/06 09:48:14 I!P Traceback (most recent call last):
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/lib/python3.6/threading.py”, li
ne 916, in _bootstrap_inner
[udf] 2018/06/06 09:48:14 I!P self.run()
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/lib/python3.6/threading.py”, li
ne 864, in run
[udf] 2018/06/06 09:48:14 I!P self._target(*self._args, **self._kwargs)
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/src/kapacitor/udf/agent/py/kapa
citor/udf/agent.py”, line 143, in _read_loop
[udf] 2018/06/06 09:48:14 I!P self.write_response(response)
[udf] 2018/06/06 09:48:14 I!P File “/usr/local/src/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 92, in write_response
[udf] 2018/06/06 09:48:14 I!P self._out.write(data)
[udf] 2018/06/06 09:48:14 I!P TypeError: write() argument must be str, not bytes
[udf] 2018/06/06 09:48:14 I!P
[udf] 2018/06/06 09:48:14 I!P 2018-06-06 09:48:14,923 INFO:root: Agent finished
[httpd] 2018/06/06 09:48:14 I! Closed HTTP service
[edge:task_master:main|write_points->stream] 2018/06/06 09:48:14 D! closing c: 0 e: 0
[srv] 2018/06/06 09:48:14 D! closing service: *httpd.Service

And I finally worked around this error by modifing udf/agent.py as follows, and enabled to run up a kapacitor process.

udf/agent.py
class Agent(object):
- def init(self, _in=sys.stdin, out=sys.stdout,handler=None):
+ To like this: def init(self, _in=sys.stdin.buffer, out=sys.stdout,handler=None):
# Write message
- # self._out.write(data.decode(‘utf-8’))
+ self._out.write(data.decode(‘utf-8’))
- self._out.write(data)
+ # self._out.write(data)
However, after defining and enabling UDF task, there is an another error has occured.

[edge:UDF_sample_CPU_mvavg|movingAverage3->alert4] 2018/06/07 02:19:20 D! closing c: 0 e: 0
[edge:UDF_sample_CPU_mvavg|eval2->movingAverage3] 2018/06/07 02:19:20 I! aborting c: 18 e: 10
[UDF_sample_CPU_mvavg:movingAverage3] 2018/06/07 02:19:20 E! read error: bad wiretype for oneof field in *agent.Response
[edge:UDF_sample_CPU_mvavg|alert4->influxdb_out5] 2018/06/07 02:19:20 D! closing c: 0 e: 0
[task_store] 2018/06/07 02:19:20 D! task UDF_sample_CPU_mvavg finished
[edge:UDF_sample_CPU_mvavg|stream->stream0] 2018/06/07 02:19:20 D! closing c: 18 e: 18
[edge:UDF_sample_CPU_mvavg|stream0->from1] 2018/06/07 02:19:20 D! closing c: 18 e: 18
[edge:UDF_sample_CPU_mvavg|from1->eval2] 2018/06/07 02:19:20 D! closing c: 18 e: 18
[edge:UDF_sample_CPU_mvavg|eval2->movingAverage3] 2018/06/07 02:19:20 D! closing c: 18 e: 10
[task_master:main] 2018/06/07 02:19:20 E! Stopped task: UDF_sample_CPU_mvavg movingAverage3: read error: bad wiretype for oneof field in *agent.Response
[task_store] 2018/06/07 02:19:20 E! task UDF_sample_CPU_mvavg finished with error: movingAverage3: read error: bad wiretype for oneof field in *agent.Response
[httpd] 127.0.0.1 - - [07/Jun/2018:02:19:22 +0000] “GET /kapacitor/v1/tasks/UDF_sample_CPU_mvavg?dot-view=attributes&replay-id=&script-format=formatted HTTP/1.1” 200 672 “-” “KapacitorClient” 313668c3-69f9-11e8-8057-000000000000 3233

I have tried both version 1.3.0 and 1.5.0.
I think there is something wrong with udf_pb2.py, but I don’t know about Protocol Buffer very much.
We want to execute a ML-based anomary-detection by using Python3 with science/math packages only in python.

I would appreciate if you have any advice.

Hello buddies!

I recently had the same problem. After lots of debugs and documentation reading, what worked for me was:

kapacitor.conf
[udf.functions.myudf]
prog = “/usr/bin/python3”
args = ["-u", “./udf/agent/examples/moving_avg.py”]
timeout = “10s”
[udf.functions.myudf.env]
PYTHONPATH = “/usr/bin/python3”
PYTHONIOENCODE = "latin1"

(I have installed the kapacitor udf package using sudo pip3 install .)

This additional PYTHONIOENCODE environment variable configures the python sys.stdin/sys.stdout encoding to “latin1”. I saw that this is the encode that the udf_pb2.py used when the python version is later than 3. Maybe the data that arives from the protobuf comes in this encoding …

There are also a few mods in the agent.py: I don’t know where certainly, but you have to do some encodes/decodes in order to convert some strings to bytes/bytes to strings. Every place that you need to make this use the “latin1” encoding.

Aquele abraço!

3 Likes