I am trying to use the InfluxDB v3 Java client inside an Apache Flink streaming job (custom RichSinkFunction).
A simple standalone Java test works correctly (client initialization + writePoint succeeds), but the same client initialization fails when executed inside a Flink task.
Observed Behavior
Inside Flink, the client fails during initialization in open():
java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 9: grpc+tcp:
at org.apache.arrow.flight.Location.forGrpcInsecure(Location.java:122)
at com.influxdb.v3.client.internal.FlightSqlClient.createLocation(FlightSqlClient.java:207)
at com.influxdb.v3.client.internal.FlightSqlClient.createFlightClient(FlightSqlClient.java:148)
at com.influxdb.v3.client.internal.FlightSqlClient.<init>(FlightSqlClient.java:102)
at com.influxdb.v3.client.internal.InfluxDBClientImpl.<init>(InfluxDBClientImpl.java:116)
at com.influxdb.v3.client.InfluxDBClient.getInstance(InfluxDBClient.java:519)
If I pass the host without scheme:
host:8181
I get the error above.
If I pass the host with scheme:
https://host:8181
I get a different error:
java.lang.IllegalArgumentException: Address types of NameResolver 'unix' not supported by transport
Expected Behavior
The InfluxDB v3 Java client should initialize correctly inside a Flink operator (RichSinkFunction.open()), just like it does in a standalone Java application.
Minimal Reproduction
public class InfluxSink extends RichSinkFunction<SensorEvent> {
private transient InfluxDBClient client;
@Override
public void open(Configuration parameters) throws Exception {
ClientConfig config = new ClientConfig.Builder()
.host("https://host:8181")
.token("TOKEN".toCharArray())
.database("DB")
.build();
this.client = InfluxDBClient.getInstance(config);
}
@Override
public void invoke(SensorEvent value, Context context) {
// no-op
}
}
Executed within a Flink job:
filteredStream.addSink(new InfluxSink());
Environment
-
Apache Flink: (e.g. 1.18.1)
-
Java: 17 (also tested with newer versions)
-
InfluxDB v3 Java client: 1.80 (mvn)
-
OS: macOS / Linux (tested on both)
-
Deployment: local + VM
Additional Notes
-
The same configuration works outside Flink in a plain
main()method. -
The error originates from Arrow Flight (
org.apache.arrow.flight.Location). -
This may be related to classloader isolation, URI parsing, or gRPC/Flight initialization inside Flink runtime.
Questions
-
Is the InfluxDB v3 Java client officially supported inside Flink jobs?
-
What is the correct format for
.host()when used with Flight/gRPC? -
Are there known issues with Arrow Flight + Flink classloader?
-
Is additional configuration required for gRPC name resolution in this context?
Workarounds I tried
-
Using host with and without scheme
-
Removing any custom
NameResolverRegistryconfiguration -
Running with parallelism = 1
-
Testing outside Flink (works)
Any guidance would be greatly appreciated.
