I am using Apache airflow for storing data in influxdb from oracle database. I need to store all data from oracle database into influxdb. But it insert last row of data only each company_id. Other data not insert. Here is oracle data set:
Here is code:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
from airflow.providers.oracle.hooks.oracle import OracleHook
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime,timedelta,timezone
import influxdb_client
import traceback
bucket = "myBucket"
org = "city"
token = "LsOE3PsZjive2ENtJnM3Pv2UmxcygaLymDPpR2YJO0XBZa0eA=="
url = "http://influxdb:8086"
def get_data_from_oracle(**kwargs):
oracle_hook = OracleHook(oracle_conn_id='oracle_default')
sql = """
SELECT COMPANY_ID,
CREDIT_AMOUNT,
CREDIT_ACCOUNT_NUMBER,
DEBIT_ACCOUNT_NUMBER
FROM gtt_transaction
WHERE COMPANY_ID IN (969, 116)
AND TRUNC (INSERT_DATE) = '15-AUG-2024'
AND TRX_REFERENCE IN ('15082445361298', '15082439504163')
"""
data = oracle_hook.get_pandas_df(sql)
return data
def transform_oracle_data(ti):
data = ti.xcom_pull(task_ids='getDataFromOracle_task')
return data
current_time = datetime.now(timezone.utc)
def insert_data_into_influxdb(ti):
data = ti.xcom_pull(task_ids='transanformOracleData_task')
print("insert_data_into_influxdb--data-",data)
if data is None:
print("No data received from transform_oracle_data_task")
return # Exit gracefully if no data
try:
client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
points = []
for count, row in enumerate(data.to_records(index=False)):
timestamp = current_time + timedelta(microseconds=count)
print("timestamp--",timestamp)
point = influxdb_client.Point("oracle_transactions_3") \
.tag("company_id", row.COMPANY_ID) \
.field("credit_amount", float(row.CREDIT_AMOUNT)) \
.field("credit_account_number", str(row.CREDIT_ACCOUNT_NUMBER)) \
.field("debit_account_number", str(row.DEBIT_ACCOUNT_NUMBER)) \
.time(timestamp, influxdb_client.WritePrecision.NS)
points.append(point)
write_api.write(bucket=bucket, org=org, record=points)
except Exception as e:
print(f"Error: {traceback.format_exc()}")
# DAG definition
with DAG(
dag_id='oracle_to_influxdb_line_protocol',
start_date=datetime(2024, 8, 13),
schedule_interval='*/50 * * * *',
catchup=False,
) as dag:
get_data_from_oracle_task = PythonOperator(
task_id='getDataFromOracle_task',
python_callable=get_data_from_oracle,
)
transform_oracle_data_task = PythonOperator(
task_id='transanformOracleData_task',
python_callable=transform_oracle_data,
)
insert_data_into_influxdb_task = PythonOperator(
task_id='insertIntoFluxDb_task',
python_callable=insert_data_into_influxdb,
)
# Task dependencies
get_data_from_oracle_task >> transform_oracle_data_task >> insert_data_into_influxdb_task
What is reason for inserting only one row? I want to store all data found in query.
what is the wrong of my code?
Please help me…