Poor write performance under python

Hello,
I am using Influx v2.7.6, and a python script to ingest files into it. However, the write operations are painfully slow: by profiling the program, and by using the .write method of a WriteApi object I get the following call stack and times:


For more context, the data is gathered as signals from can messages, and transformed in a pandas dataframe that is then sent to influx to write. A small part of the timing is, in fact, dedicated to the serialization of such dataframe, but it’s around 10% of the total time. Most of the time is spent inside the WriteApi.write(…) call.
I can also provide a very stripped down version of the code that is being used:

from influxdb_client.client.influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions, WriteType
from influxdb_client.domain import WritePrecision  # type:ignore  # noqa: F401

import inputs as inp
from utils import (
    ProcessData,
    list_log_files,
    load_dbc_files,
    setup_fs,
)


class SetupInflux:
    def __init__(self, influx_url, token, org_id, influx_bucket, debug=False):
        self.influx_url = influx_url
        self.token = token
        self.org_id = org_id
        self.influx_bucket = influx_bucket
        self.debug = debug
        self.client = InfluxDBClient(
            url=self.influx_url, token=self.token, org=self.org_id, debug=False
        )

        self._write_client = self.client.write_api(
            write_options=WriteOptions(
                write_type=WriteType.batching,
                batch_size=20_000,
                flush_interval=3_000,
                jitter_interval=1000,
                retry_interval=5_000,
            )
        )

    def __del__(self):
        self._write_client.close()
        self.client.__del__()

    def write_signals(self, device_id, df_phys):
        """Given a device ID and a dataframe of physical values,
        resample and write each signal to a time series database

        :param device_id:   ID of device (used as the 'measurement name')
        :param df_phys:     Dataframe of physical values (e.g. as per output of can_decoder)
        """
        tag_columns = []

        if df_phys.empty:
            print("Warning: Dataframe is empty, no data written")
            return
        else:
            for signal, group in df_phys.groupby("Signal")["Physical Value"]:
                df_signal = group.to_frame().rename(columns={"Physical Value": signal})

                self.write_influx(device_id, df_signal, tag_columns)

    def write_influx(self, name, df, tag_columns):
        """Helper function to write signal dataframes to InfluxDB"""

        self._write_client.write(
            self.influx_bucket,
            record=df,
            data_frame_measurement_name=name,
            data_frame_tag_columns=tag_columns,
            # write_precision=WritePrecision.US,  # type: ignore
        )


# initialize connection to InfluxDB + get latest data entries per device
influx = SetupInflux(inp.influx_url, inp.token, inp.org_id, inp.influx_bucket)

# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(inp.s3, inp.key, inp.secret, inp.endpoint, inp.region, passwords=inp.pw)

db_list = load_dbc_files(inp.dbc_paths)
log_files = list_log_files(fs, inp.devices, True, inp.pw)

# Custom class to process log files, method returns a Pandas dataframe
proc = ProcessData(fs, db_list, inp.signals, inp.days_offset, verbose=False)


for log_file in log_files:
    df_raw, device_id = proc.get_raw_data(log_file, inp.pw)

    df_phys = proc.extract_phys(df_raw)

    influx.write_signals(device_id, df_phys)

Is there any way to speed up this write operation? It takes roughly 100 seconds for very small log files, which is very very bad; the cpu usage doesn’t even go that high when processing, it stays very low actually. I am trying to upload cProfile’s profiling output file, but as a new user I can’t put attachments; I’ll figure something out if it is needed.

I also tried increasing the batch size to 50_000 and the flush_interval to 10_000, but the results are underwhelming: