Cross series aggregation of the data from multiple series within a measurement

Similar questions have been asked before in the context of influxql, however the answers seem to have always missed the point. Influxql doesn’t seem able to fulfill and from reading the documentation for JOIN neither does flux. It actually feels like a pretty big ask too.

–The Scenario–
Lets say I have 3 virtual machines in my environment, and I want to use influx to get a view of the memory usage in order to manage capacity. i.e. When do I project that my environment will need more RAM.

So each VM regular sends
memory_measurement,host_tag=<host_name> ram_total_field=<value>, ram_used_field=<value>, ram_free_field=<value>
resulting in 3 series of data in memory_measurement

–Available results–
I DONT want to view
Time | ram_free_field | host_tag
1 | 10 | host_1
1 | 54 | host_2
1 | 78 | host_3
2 | 12 | host_1
2 | 56 | host_2
2 | 73 | host_3
That will be a very noisy graph!

I DONT want to group by hosts_tag and view 3 result sets
Time | ram_free_field | host_tag
1 | 10 | host_1
2 | 12 | host_1

1 | 54 | host_2
2 | 56 | host_2

1 | 78 | host_3
2 | 73 | host_3

– The Ask –
I DO want to see the total across all hosts
Time | total_ram_free_field
1 | 132
2 | 131

I MIGHT even then want to the work out the rate of change of free memory now that there are less series to crunch through.
Time | derivative_total_ram_free_field
2 | -1

There are naturally significant coding and performance issues when there are large numbers of hosts (cardinality) and when there is missing (fill) or miss aligned data (window).

I would like to have a query something like the following:

|>  filter(fn: (r) =>
  r.measurement == "memory_measurement"
  r.field == "ram_free_field"
|> group(columns: ["host_tag"], mode: "by")
*|> group_window(every: <#>, fill: "previous")*
*|> group_join(fields: ["ram_free_field"], mode: "total")*
|> derivative(unit: <#>)

Attempting to answer my own question: I have come up with one solution (untested), however it would be critical to get the correct aggregateWindow period to avoid summing data from the same host twice:

total = (tables=<-) => tables
  |> reduce(
    fn: (r, accumulator) => ({
      index: accumulator.index + 1,
      total: if accumulator.index == 0 then r._value else r._value + accumulator.total
      mean: accumulator.total / (accumulator.index +1)
    }),
    identity: { index: 0, total: 0.0, mean: 0.0 }
  )
  |> drop(columns: ["index"])

// --- Adjust below as necessary ---
from(bucket: "my_database")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "memory_measurement" and r._field == "ram_free_field")
  |> aggregateWindow(
    every: 1m,
    fn: (tables=<-, column) => tables |> total()
  ) 

This is based on the staff answer in Flux group by time
Can anyone else come up with something better?

Ive included the mean in the custom aggregation function as I feel that multiplying the mean by the number of different host_tags in the data set would at least give a reasonable value to display which would be tolerant of miss aligned data, but I have no idea how to do that.
Really I want all tags available to me in the reduce function.