Scaling Series and Queries on 16GB of RAM

influxdb

#1

I’m experiencing some scaling issues with influxDB and I’m out to understand what the best solution to my problem might be. I store stock trade data in the following schema:

measurement: trades
keys: symbol (28000), exchange (17)
fields: price (float), size (int)
timestamp: ns (for duplicate trades to avoid needing another tag)

This gives an estimated cardinality of 476k. SHOW SERIES CARDINALITY gives 469k.

I have on average 26 million points in a day, for all market days in a year (about 250 days/year). I have 5 years of data. I used the default shard size of one week.

I tried running the following query:

SELECT first(price) as o, max(price) as h, min(price) as l, last(price) as c, sum(size) as v
FROM trades WHERE size >= 100
GROUP BY time(1m), symbol fill(none)

but influxDB used all my system resources and crashed with out of memory errors. I tried breaking it down by weeks using a WHERE clause, but it also crashed. Only breaking it down by days worked, but the queries were extremely slow and did not utilize even half my CPU or disk. Based off the hardware sizing guidelines I hoped my 6-core and 16GB of RAM workstation for a “moderate load” would be sufficient.

I’m currently in the process of reducing my cardinality by doing away with the exchange (17) tag. I’m also changing to using a infinite shard size, since I don’t ever need to delete data. However, running this query per-day to insert into my new database is taking 300+ seconds per-day:

SELECT * INTO us_equities2."almostINF".trades
FROM trades WHERE time >= '{}' AND time < '{}'
GROUP BY symbol

and hardly any of my system resources are being used. This is a pitiful write speed of 87k points/s, which is less than I acheived while inserting into the database in the first place. Also, despite setting my configuration to use the tsi1 index by default for new shards, influxDB seems to be using .tsm files when I peak inside my data directory. Here is my config, with not much changed:

[data]
  dir = "E:/InfluxDB/data"
  wal-dir = "C:/Program Files/influxdb-1.7.1-1/wal"
  # wal-fsync-delay = "0s"
  index-version = "tsi1"
  # trace-logging-enabled = false
  # query-log-enabled = true
  # validate-keys = false
  # cache-max-memory-size = "1g"
  cache-snapshot-memory-size = "25m"
  cache-snapshot-write-cold-duration = "10m"
  compact-full-write-cold-duration = "4h"
  # max-concurrent-compactions = 0
  # compact-throughput = "48m"
  # compact-throughput-burst = "48m"
  # max-index-log-file-size = "1m"
  # max-series-per-database = 1000000
  # max-values-per-tag = 100000
  # tsm-use-madv-willneed = false

I understand I can use the influx_inspect tool to transform the .tsm files to .tsi1 files, which I plan on doing once my queries complete. However, even after all this, I’m afraid my aggregate queries will crash again.

In the meantime with 70 hours I’m seriously questioning influxDB. Will tsi1 solve my problems? Do I just need more RAM? How much more RAM? Is influxDB not suited for these types of queries due to how its storage engine works? Why in the world is influx so slow inserting back into itself?

Thanks for any help.


#2

Your issue does not seem to be related to cardinality but rather to the misunderstanding of what your query is doing under the hood.

As you are using influxql you have a false impression of using SQL, forgetting you are using a time series database and therefore the sole index that exists (apart from that on tags) is on the time column. So your WHERE clause (WHERE size >= 100) will need to read all your data (you did not specify a time range in your initial query) and only retain the fields for the timestamps with a size >= 100.

This will lead to massive amounts of data being read, which takes some time and consequent (all rows for which size is >= 100) amounts of data being kept in memory.

For this type of query (spanning all your dataset), a batch processing tool such as Spark or Flink is probably more suited, with data stored as files (parquet, sequencefile or orc).

Inserting data back into InfluxDB will have the storage engine update shards, thus moving lots of data around which takes time. Again I don’t think the problem is related to the metadata and therefore adopting tsi probably won’t help.


#3

I see now, as you suggest, it was overly optimistic to assume the storage engine could also handle the rollup levels easily on historical data because of how the query engine works. I am using the wrong tool for the job. However, my primary motivation for not using HDF5 files (or something that can be map-reduced like Parquet files) is wanting to use the same storage engine for backtesting that I use live. This way, aggregations are consistent with backfilled or live data. Also, HDF5 files do not compress as well as influxDB compresses my data.

I need to be able to serve the following query patterns:

  • 4 rollup levels: 1s aggregations, 1m aggregations, 5m aggregations, 1d aggregations on backfilled and live data using size >= 100 grouped by symbol
  • Push new 1s aggregations grouped by symbol to external applications

With the following constraints:

  • Write: 1k points/sec (with bursts of up to 10k), and reasonable backfill time of 32B points (less than a week)
  • Storage: less than 2TB for 32B points
  • Hardware: 6-core, 16GB of RAM. VMs for heavy lifting are fine, assuming aggregated data can cheaply be lifted transferred back locally

InfluxDB seems to handle these requirements fine, except for aggregating backfilled data. Is there a batch processing tool that can easily use influxDB as the storage engine? Is there a better storage engine suited for backfilling data that can also work for live data?


#4

As far as I know there is not InputFormat for reading data out of InfluxDB, for one it does not seem that InfluxDB exposes the location of data (in the case of a cluster), and therefore there is no way to create splits which would benefit from data locality, and second there does not seem to be an endpoint in influx for reading data in a streaming way, so any fetch of data would first need to load the data in memory which would kill your data nodes if you are fetching lots of data for your batch.

If someone knows differently and can point you to a Hadoop InputFormat for InfluxDB that you could leverage from Spark or Flink I’d be happy to discover it too.

DM me to further discuss this, as it is kind of out of the influxdb topic.


#5

Is it possible to increase the RAM size to 32GB? Because, when I was playing around (load test) with it with large data set, it was crashing even without query. Ingestion itself was crashing for me. So, during my experiment, I saw that 32GB was a sweet spot for me. I also got that confirmed when I visited one of InfluxData roadshows. I was told that 32GB should be my minimum RAM in production. Also, I am not sure, if you are running anything else on that instance (like CQs). Also, regarding rollups, I am not sure if you are doing it using Kapacitor or CQs. In my case, since I am using Kafka for my data source, I am using Kafka Streams to rollup right there in Kafka, and then ingest rolledup data to InfluxDB. That way, I am not using InfluxDB resources for any rollup activity.
PS: I am not in production yet. But this is the approach I am taking for my work. Good luck! :slightly_smiling_face: