Field values from another measurement mixed in another measurement

I’m sending data into influx-DB 1.8 OSS, which has 2 measurements. In that one measurement got mixed field values from another measurement.

Eg:

measurement 1

field: Apple, Banana, Grapes, Orange

Measurement 2

field: Potato, Cucumber, Onion, Carrot

When I checked measurement 1, I got the result:

Apple Banana Potato Grapes Onion Orange.

Why did this happen? I’m using Python to push data to InfluxDB. For measurement 1 and measurement 2 I’m using separate function. And using Multiprocessing to Push at the same time to InfluxDB.

Please suggest why this is happening.

Hello @Sreevalsan_Jeyakrish,
I’m not sure. Can you please share your code?

Tagging @bednar as well because he’s the expert.

Hi @Sreevalsan_Jeyakrish,

can you share how looks like your code for inserting and querying data?

Regards

Sure, I can share the bit of code that I use to push the data.

def sensor_influx():
    '''
    Function to take the Sensor data stored in the CSV file
    and then push that data into the InfluxDB.
    '''
    try:
        dataframe = pd.read_csv('File name')
        data = dataframe.to_json(orient='records')
        sensor_data = json.loads(data)
        sensor_list = []
        for item in sensor_data:
            sensor_list.append(
                {
                    'measurement': item['measurement'],
                    'tags': eval(item['tags']),
                    'fields': eval(item['fields']),
                    'time': item['time']
                }
            )

        CLIENT.write_points(sensor_list, database='databaseName', time_precision='ms', batch_size=5000)
    except excep.InfluxDBServerError:
        # pass
        print("Sensor Influx Connection Error ----!!!", excep.InfluxDBServerError)


if __name__ == '__main__':

    TIME1 = time()
    PROCESS1 = mp.Process(target=sensor_influx)

    PROCESS1.start()
    TIME2 = time()
    print(TIME2 - TIME1)

Where my CSV file will be of 3 Measurements.
One row contains One Measurement data
2nd row contains another measurement data
and the third row contains another measurement data.
And this series continues in CSV files like 1 lakh data of these 3 measurements.

I have shared the code in @bednar 's reply. Kindly help me out.

It is a little bit complicated transformation. Can you share your CSV file?

“measurement”,“tags”,“fields”,“time”
“measurement1”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Apple”“: 11, ““Banana””: 12,”“Grapes”“: 13, ““Orange””: 14}”,1653029005621
“measurement2”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Potato”“: 11, ““Cucumber””: 12,”“Onion”“: 13, ““Carrot””: 14}”,1653029005622
“measurement1”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Apple”“: 11, ““Banana””: 12,”“Grapes”“: 13, ““Orange””: 14}”,1653029005623
“measurement2”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Potato”“: 11, ““Cucumber””: 12,”“Onion”“: 13, ““Carrot””: 14}”,1653029005624
“measurement1”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Apple”“: 11, ““Banana””: 12,”“Grapes”“: 13, ““Orange””: 14}”,1653029005625
“measurement2”,“{Tag1 : tag, Tag2 : tag2 }”,“{”“Potato”“: 11, ““Cucumber””: 12,”“Onion”“: 13, ““Carrot””: 14}”,1653029005626

This is the CSV file… this is a demo…

Hi,

Sure, I can share the bit of code that I use to push the data.

def sensor_influx():

'''
Function to take the Sensor data stored in the CSV file
and then push that data into the InfluxDB.
'''
try:
dataframe = pd.read_csv('File name')
data = dataframe.to_json(orient='records')
sensor_data = json.loads(data)
sensor_list = []
for item in sensor_data:
sensor_list.append(
{
'measurement': item['measurement'],
'tags': eval(item['tags']),
'fields': eval(item['fields']),
'time': item['time']
}
)

CLIENT.write_points(sensor_list, database='databaseName', time_precision='ms', batch_size=5000)
except excep.InfluxDBServerError:
# pass
print("Sensor Influx Connection Error ----!!!", excep.InfluxDBServerError)

if __name__ == '__main__':

TIME1 = time()
PROCESS1 = mp.Process(target=sensor_influx)

PROCESS1.start()
TIME2 = time()
print(TIME2 - TIME1)

Where my CSV file will be of 3 Measurements.
One row contains One Measurement data
2nd row contains another measurement data
and the third row contains another measurement data.
And this series continues in CSV file like 1 lakh data of these 3 measurements.

Your code for pushing data is fine. The produced data are correct:

"measurement","tags","fields","time"
"measurement1","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Apple"": 11, ""Banana"": 12,""Grapes"": 13, ""Orange"": 14}",1653029005621
"measurement2","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Potato"": 11, ""Cucumber"": 12,""Onion"": 13, ""Carrot"": 14}",1653029005622
"measurement1","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Apple"": 11, ""Banana"": 12,""Grapes"": 13, ""Orange"": 14}",1653029005623
"measurement2","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Potato"": 11, ""Cucumber"": 12,""Onion"": 13, ""Carrot"": 14}",1653029005624
"measurement1","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Apple"": 11, ""Banana"": 12,""Grapes"": 13, ""Orange"": 14}",1653029005625
"measurement2","{'Tag1' : ""tag"", 'Tag2' : ""tag2"" }","{""Potato"": 11, ""Cucumber"": 12,""Onion"": 13, ""Carrot"": 14}",1653029005626
import json

import pandas as pd

dataframe = pd.read_csv('demo.csv')
data = dataframe.to_json(orient='records')
sensor_data = json.loads(data)
sensor_list = []
for item in sensor_data:
    sensor_list.append(
        {
            'measurement': item['measurement'],
            'tags': eval(item['tags']),
            'fields': eval(item['fields']),
            'time': item['time']
        }
    )
print(sensor_list)

[
{'measurement': 'measurement1', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Apple': 11, 'Banana': 12, 'Grapes': 13, 'Orange': 14}, 'time': 1653029005621}, 
{'measurement': 'measurement2', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Potato': 11, 'Cucumber': 12, 'Onion': 13, 'Carrot': 14}, 'time': 1653029005622}, 
{'measurement': 'measurement1', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Apple': 11, 'Banana': 12, 'Grapes': 13, 'Orange': 14}, 'time': 1653029005623}, 
{'measurement': 'measurement2', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Potato': 11, 'Cucumber': 12, 'Onion': 13, 'Carrot': 14}, 'time': 1653029005624}, 
{'measurement': 'measurement1', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Apple': 11, 'Banana': 12, 'Grapes': 13, 'Orange': 14}, 'time': 1653029005625}, 
{'measurement': 'measurement2', 'tags': {'Tag1': 'tag', 'Tag2': 'tag2'}, 'fields': {'Potato': 11, 'Cucumber': 12, 'Onion': 13, 'Carrot': 14}, 'time': 1653029005626}
]

How do you check your data?

This doesn’t happen every time… At some point… I have lakhs of data. So I don’t know where it happened. And in that lakh of data, Only a small amount of data is only mixing up…

Actually, I got into InfluxDb 1.8.2 OSS and typed the command, select count(*) from measurement1, so, it gives every field. That’s how I came to know about this. as well as I’m using grafana for visualizing the data. So the field values got mixed up in that too…

It is correct.

Maybe you are using python’s global variables in your script? Current piece looks good.

Are you sure that your data in CSV are consistent?

Yes, it is. Every measurement contains data. but, at some point of time Error comes for, unable to write. partial write. But I don’t think it can cause the mixing up of field values !!!

Actually, It’s getting mixed up with another CSV file’s measurement. That’s what made me so confused. for those two CSV files I’m using two Multiprocessing, So it might also be a cause for that issue.

1 Like

Glad to hear it’s working for you.

No, It didn’t.
I was thinking, Is it the multiprocessing is the issue, that, this mixing up of field values in InfluxDB?

That’s a question type.

No. The client is able to works in the multiprocessing environment.

The Complete process is:
Data is been collecting in the CSV as above shown format.
another script has two functions for 2 CSV files, which reads those CSV and the push into InfluxDB with multiprocessing.

I don’t see any mismatch in the CSV file. The only assumption is Internet slowness and disconnection in between the processing of influxdb, If the multiprocessing is not the issue then.