How to implement a SQL Sub Query in Flux?

I’m trying to migrate my SQL query to Flux. there are two tables, Order_hist and Quote. the query needs to get the quote price for 3 seconds after the Order is placed. and then calculate the profit/loss.
in SQL, it’s very simple. a sub query to join the Quote table on product and _time(range) and then get the last price value. I researched many days. but I cannot figure out how to do in Flux.

order_hist

_time orderID product price qty
2020-06-01 00:00:01.000 Order-1 prod-A 100 10
2020-06-01 00:00:01.000 Order-2 prod-B 20 100
2020-06-01 00:00:02.000 Order-3 prod-B 21 100

quote

_time product price
2020-06-01 00:00:01.000 prod-A 100
2020-06-01 00:00:02.000 prod-A 101
2020-06-01 00:00:03.000 prod-A 102
2020-06-01 00:00:04.000 prod-A 103
2020-06-01 00:00:05.000 prod-A 104
2020-06-01 00:00:01.000 prod-B 20
2020-06-01 00:00:02.000 prod-B 21
2020-06-01 00:00:03.000 prod-B 22
2020-06-01 00:00:04.000 prod-B 23
2020-06-01 00:00:05.000 prod-B 24

select *
,price_3s = (select top 1 price from quote q where q.product = o.product and q._time between o._time and DATEADD(second,3,o._time) order by _time desc )
from order_hist o

_time orderID product price qty price_3s
2020-06-01 00:00:01.000 Order-1 prod-A 100 10 103
2020-06-01 00:00:01.000 Order-2 prod-B 20 100 23
2020-06-01 00:00:02.000 Order-3 prod-B 21 100 24

This should give you something similar:

import "experimental"

order_hist = // the order history query
quote = // the quote query
3s = quote
    |> map(fn: (r) => ({ r with _time: experimental.subDuration(d: 3s, from: r._time) }))

join(
    tables: {order: order_history, 3s: 3s},
    on: ["_time","product"]
  )

Thanks @scott. but it doesn’t work.
the order_hist._time and quote._time are not exactly equal. the order._time precision is second. and the Quote._time precision is ms. it needs to find the closest quote price to 3 seconds.
and for performance. I plan to run the query daily. the Order has ~10k records per day. but the Quote has 100 million records per day. I’m not sure if Flux can load the data into bucket. and we also need the quote price after 5s, 10s, 30s,60s,120s and 300s.

That is quite a bit of data to load in a single query. Currently Flux will likely have to store all of that in memory until the query finishes. If you have the hardware, it may be able to do it, but with that amount of data and the current unoptimized state of join(), I think you’re right. It likely won’t be able to complete the query.

About _time values not lining up exactly, something you can do is use truncateTimeColumn() to normalize your timestamps to a less precise unit of measure (seconds instead of nanoseconds).

After truncateTimeColumn(unit:1s), the _time value will be duplicate. how to get the last value(quote price) in the Join()? and the Quote may not be continuous. If the _time do not match exactly, it still needs to return the last quote (< 3s), not the Null value or record. how to do it?