Hi,
I have been observing a very strange behaviour with async’ requests: the response time increases linearly with the number of concurrent http requests I send from my client.
I set up my InfluxDB using docker-compose with the following entries in my docker-compose.yml.
services:
influxdb:
image: influxdb:latest
container_name: influxdb
restart: always
ports:
- 8086:8086
volumes:
- shared_volumes/influxdb:/home/influxdb/shared_volume
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: password
DOCKER_INFLUXDB_INIT_ORG: MyOrg
DOCKER_INFLUXDB_INIT_BUCKET: test-bucket
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: test-token
INFLUXD_QUERY_CONCURRENCY: 200
INFLUXD_QUERY_QUEUE_SIZE: 1
To finish setting up my DB and insert the data (I can provide the data if necessary, as a new user I cannot post attachments), I use the following script
import os
import sys
import subprocess as sp
path_to_file = 'shared_volume/cac.lp'
bucket_id = os.popen('sudo docker exec influxdb influx bucket list').read().split('\t')[25].split('\n')[1]
print(f'bucket has id {bucket_id}')
os.system(f'sudo docker exec influxdb influx write -b test-bucket -f {path_to_file} --format=lp')
os.system(f'sudo docker exec influxdb influx v1 dbrp create --bucket-id {bucket_id} --db test-bucket --rp test-retention')
os.system(f'sudo docker exec influxdb influx v1 auth create --read-bucket {bucket_id} --write-bucket {bucket_id} --username admin --no-password --token test-token')
Finally, to send my concurrent http requests I use that script
# Source (modified version of this):
# https://stackoverflow.com/questions/57126286/fastest-parallel-requests-in-python
import asyncio
import aiohttp
import time
import numpy as np
import time as tm
import pandas as pd
import matplotlib.pyplot as plt
VENUES = 2
BOOKS = 2
HEADERS = {
'Host': '127.0.0.1:8086',
'User-Agent': 'python-requests/2.26.0',
'Accept-Encoding': 'gzip, deflate, br',
'clearAccept': 'application/x-msgpack',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Authorization': 'Token test-token'
}
BASE_URL = 'http://127.0.0.1:8086'
async def get(n, url, session):
try:
start = time.time()
async with session.get(url=url) as response:
resp = await response.read()
end = time.time()
duration = end - start
print(f"Success -- URL {url} -- resp length {len(resp)} -- duration {end - start} seconds -- job {n}.")
except Exception as e:
resp = f"Failure -- URL {url} -- except {e.__class__} -- job {n} "
return len(resp), n, duration
async def retrieve_queries(urls, base_url = BASE_URL, headers = HEADERS):
async with aiohttp.ClientSession(base_url=base_url, headers=headers) as session:
ret = await asyncio.gather(*[get(n, url, session) for n, url in enumerate(urls)])
print("Finalized all. Return is a list of len {} outputs.".format(len(ret)))
return ret
def make_query(N_venues, N_books):
fields = ['open', 'high', 'low', 'close', 'adjustedclose', 'volume']
query = '/query?q=Select+' + '%2C+'.join(fields) + '+from+daily%3B&db=test-bucket&chunked=true&chunk_size=100'
filters = [
f'("venue"=\'{np.random.randint(0, i)}\'+and+"book"=\'{np.random.randint(0, j)}\')'
for i in range(1, N_venues + 1)
for j in range(1, N_books + 1)
]
where_clause = f'+where+{"+or+".join(filters)}' if filters else ''
query = '/query?q=Select+' + '%2C+'.join(fields) + '+from+daily'
query += where_clause
query += '%3B'
query += '&db=test-bucket'
return query
async def compute_query_stats(urls, n):
loop = asyncio.get_event_loop()
ret = await loop.create_task(retrieve_queries(urls[:n]))
durations = np.array([duration for _, _, duration in ret])
lengths = np.array([length for length, _, _ in ret])
return (n, durations.mean(), durations.std(), lengths.mean(), lengths.std())
n_max = 100
urls = n_max*[make_query(0, 0)]
data = ()
for n in (10, 20, 30, 40, 50, 60, 70, 80, 90, 100):
data += (await compute_query_stats(urls, n), )
data = pd.DataFrame(data, columns = ['Number_of_queries', 'Avg_duration', 'Std_duration', 'Avg_length', 'Std_length'])
data = data.set_index('Number_of_queries')
plt.plot(data.index, data.Avg_duration, 'k-')
plt.plot(data.index, data.Avg_duration + data.Std_duration, 'k--')
plt.plot(data.index, data.Avg_duration - data.Std_duration, 'k--')
The result I get is the following:
I would have not expected my async query batches’ response times to be linear with the batch size. In layman’s terms, if I send 10 queries it takes about 3 seconds to get a response for all queries. If I send 100, it takes about thirty seconds.
(Note that all this is consistent with the slope of the above curve to be about 0.3 seconds/query.)
What I would have expected instead would have been for each query to take about 0.3 seconds and arrive whenever they can.
Could you please shed some light on this? Is there a way to avoid this batching of responses?
Many thanks in advance and best wishes,
M0nk