Memory leak after writing to database

version: 1.8

I write to dabase in a “for loop” , but it seems that the memory doesn’t releas after writing to the db.

here is my code.

 import pandas as pd
from influxdb_client import InfluxDBClient, WriteOptions
from datetime import datetime


class StockDataseed:
    def __init__(self, db_url: str, db_org: str, bucket: str, token: str, timeout=30000):
        self.db_url = db_url
        self.db_org = db_org
        self.bucket = bucket
        self.token = token
        self.timeout = timeout

    def save_stock_to_database(
        self, symbol, dataframe, interval="daily", market="cn", fuquan=""
    ):
        if symbol is None or dataframe is None:
            raise RuntimeError("保存到数据库失败,请检查入参的完整性")

        # 使用上下文管理器自动关闭 client 和 write_api
        with InfluxDBClient(
            url=self.db_url,
            token=self.token,
            org=self.db_org,
            timeout=self.timeout,
            enable_gzip=True,  # 启用压缩传输,节省带宽和时间
        ) as client:
            with client.write_api(
                write_options=WriteOptions(batch_size=5000, flush_interval=1000)
            ) as write_api:
                print("*** Write To InfluxDB ***")

                df_k_line = dataframe.copy()
                df_k_line["日期"] = pd.to_datetime(df_k_line["日期"])
                df_k_line.set_index("日期", inplace=True)
                df_k_line["symbol"] = symbol

                measurement = f"{market}_{interval}_kline{fuquan}"

                write_api.write(
                    bucket=self.bucket,
                    org="-",
                    record=df_k_line,
                    data_frame_measurement_name=measurement,
                    data_frame_tag_columns=["symbol"],
                )

    def query_stock_from_database(
        self,
        symbol: str,
        startDate: str,
        endDate: str,
        interval: str = "daily",
        market: str = "cn",
        fuquan: str = "",
    ):
        date_start_obj = datetime.strptime(startDate, "%Y-%m-%d")
        start_time = date_start_obj.strftime("%Y-%m-%dT00:00:00Z")

        date_end_obj = datetime.strptime(endDate, "%Y-%m-%d")
        end_time = date_end_obj.strftime("%Y-%m-%dT23:59:59.999999Z")

        measurement = f"{market}_{interval}_kline{fuquan}"

        # 使用上下文管理器确保连接及时释放
        with InfluxDBClient(
            url=self.db_url,
            token=self.token,
            org=self.db_org,
            timeout=self.timeout,
            enable_gzip=True,
        ) as client:
            query_api = client.query_api()

            query = (
                'from(bucket:"{}")'
                " |> range(start: {}, stop: {})"
                ' |> filter(fn: (r) => r._measurement == "{}")'
                ' |> filter(fn: (r) => r.symbol == "{}")'
                '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
                '|> keep(columns: ["_time", "股票代码", "开盘", "收盘", "最高", "最低", "成交量", "成交额", "振幅", "涨跌幅", "涨跌额", "换手率" ])'
            ).format(self.bucket, start_time, end_time, measurement, symbol)

            # 使用 chunked=True 分页读取,降低内存压力
            tables = query_api.query_data_frame(query)

            # 如果返回的是生成器,则将其合并为一个 DataFrame
            # if hasattr(tables, '__iter__'):
            #     result_df = pd.concat(tables, ignore_index=True)
            # else:
            result_df = tables

            return result_df




stockSeed = StockDataseed(
    db_url=db_url,
    db_org=db_org,
    bucket=bucket,
    token=token,
)



import akshare as ak
import time
import pandas as pd
from datetime import datetime



# --- 1. 获取指数成分股 ---
print(f"正在获取指数 {INDEX_SYMBOL} 的成分股列表...")
try:
    # 获取中证指数网提供的成分数据 [[1]]
    cons_df = index_stock_cons_df_11_16
    # 假设返回的DataFrame中有一列是股票代码,通常名为 '品种代码' 或类似名称
    # 请注意:akshare返回的列名可能会变化,请根据实际输出调整列名
    print(cons_df.head()) # 打印前几行以确认列名
    stock_symbols = cons_df['品种代码'].tolist() # 请根据实际列名修改 '品种代码'
    print(f"获取到 {len(stock_symbols)} 只成分股。")
except Exception as e:
    print(f"获取成分股列表失败: {e}")
    stock_symbols = [] # 如果获取失败,后续循环不会执行

# --- 2. 遍历成分股并获取历史数据 ---


for i, symbol in enumerate(stock_symbols):
    print(f"({i+1}/{len(stock_symbols)}) 正在获取股票 {symbol} 的历史数据...")
    try:
        # 获取沪深A股历史行情数据 [[10]]
        # 注意:akshare的接口参数可能随版本更新,请参考最新文档
        # 假设使用 stock_zh_a_hist 接口,并且支持 adjust 参数进行复权
        stock_data = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=START_DATE, end_date=END_DATE, adjust=ADJUST_TYPE)
        stockSeed.save_stock_to_database(
            symbol=symbol,
            dataframe=stock_data,
            fuquan=ADJUST_TYPE,
        )
        
        if not stock_data.empty:
            print(f"  -> 成功获取 {len(stock_data)} 条数据。")
            
   
            
        else:
            print(f"  -> 股票 {symbol} 返回空数据。")
            
    except Exception as e:
        print(f"  -> 获取股票 {symbol} 数据时出错: {e}")
    
    # --- 3. 控制请求频率 ---
    # 在每次循环(除了最后一次)结束后暂停指定时间 [[24]]
    if i < len(stock_symbols) - 1: # 避免在最后一次循环后也等待
        print(f"  -> 等待 {SLEEP_INTERVAL} 秒...")
        time.sleep(SLEEP_INTERVAL) 

print("所有成分股数据获取完成。")




Although I will use ‘with’ statement to close the db to avoid memory leaking, but it seems that doesn’t work.

Do you have any adivces?

Thanks