I’m following this manual on how to migrate between organizations due to InfluxDb no longer supporting European regions on GCP. We need to migrate our organization to us-based region by the end of the month.
When running the migration tasks we encounter following error:
{"code":"internal error","message":"error calling function \"metadata\" @97:1-97:11: error calling function \"findRecord\" @67:32-67:69: runtime error @59:12-59:19: count: wrong number of fields"}
Some task finish successfully, while others fail for hour or even longer. Retrying does not change the outcome.
It’s hard to say whether the data for failed tasks has been migrated.
Task definition
import "array"
import "experimental"
import "influxdata/influxdb/secrets"
option task = {name: "xyz_migration", every: 5m}
migration = {
start: 2023-06-01T00:00:00Z,
stop: 2023-07-01T00:00:00Z,
batchInterval: 2h,
batchBucket: "xyz_migration_metadata_2",
sourceHost: "https://europe-west1-1.gcp.cloud2.influxdata.com",
sourceOrg: "Org Name",
sourceToken: secrets.get(key: "INFLUXDB_CLOUD_TOKEN"),
sourceBucket: "xyz",
destinationBucket: "xyz",
}
batchRange = () => {
_lastBatchStop =
(from(bucket: migration.batchBucket)
|> range(start: migration.start)
|> filter(fn: (r) => r._field == "batch_stop")
|> filter(fn: (r) => r.srcOrg == migration.sourceOrg)
|> filter(fn: (r) => r.srcBucket == migration.sourceBucket)
|> last()
|> findRecord(fn: (key) => true, idx: 0))._value
_batchStart =
if exists _lastBatchStop then
time(v: _lastBatchStop)
else
migration.start
return {
start: _batchStart,
stop: experimental.addDuration(d: migration.batchInterval, to: _batchStart),
}
}
batch = {start: batchRange().start, stop: batchRange().stop}
finished =
if batch.start >= migration.stop then
die(msg: "Batch range is beyond the migration range. Migration is complete.")
else
"Migration in progress"
data = () =>
from(
host: migration.sourceHost,
org: migration.sourceOrg,
token: migration.sourceToken,
bucket: migration.sourceBucket,
)
|> range(start: batch.start, stop: batch.stop)
rowCount =
data()
|> count()
|> group(columns: ["_start", "_stop"])
|> sum()
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])
metadata = () => {
_input =
if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
rowCount
else
emptyRange
return
_input
|> map(
fn: (r) =>
({
_time: now(),
_measurement: "batches",
srcOrg: migration.sourceOrg,
srcBucket: migration.sourceBucket,
dstBucket: migration.destinationBucket,
batch_start: string(v: batch.start),
batch_stop: string(v: batch.stop),
rows: r._value,
percent_complete:
float(v: int(v: r._stop) - int(v: migration.start)) / float(
v: int(v: migration.stop) - int(v: migration.start),
) * 100.0,
}),
)
|> group(columns: ["_measurement", "srcOrg", "srcBucket", "dstBucket"])
}
data()
|> to(bucket: migration.destinationBucket)
metadata()
|> experimental.to(bucket: migration.batchBucket)
Is there anything we can do about this issue? What other options do we have to migrate the data?