version: 1.8
I write to dabase in a “for loop” , but it seems that the memory doesn’t releas after writing to the db.
here is my code.
import pandas as pd
from influxdb_client import InfluxDBClient, WriteOptions
from datetime import datetime
class StockDataseed:
def __init__(self, db_url: str, db_org: str, bucket: str, token: str, timeout=30000):
self.db_url = db_url
self.db_org = db_org
self.bucket = bucket
self.token = token
self.timeout = timeout
def save_stock_to_database(
self, symbol, dataframe, interval="daily", market="cn", fuquan=""
):
if symbol is None or dataframe is None:
raise RuntimeError("保存到数据库失败,请检查入参的完整性")
# 使用上下文管理器自动关闭 client 和 write_api
with InfluxDBClient(
url=self.db_url,
token=self.token,
org=self.db_org,
timeout=self.timeout,
enable_gzip=True, # 启用压缩传输,节省带宽和时间
) as client:
with client.write_api(
write_options=WriteOptions(batch_size=5000, flush_interval=1000)
) as write_api:
print("*** Write To InfluxDB ***")
df_k_line = dataframe.copy()
df_k_line["日期"] = pd.to_datetime(df_k_line["日期"])
df_k_line.set_index("日期", inplace=True)
df_k_line["symbol"] = symbol
measurement = f"{market}_{interval}_kline{fuquan}"
write_api.write(
bucket=self.bucket,
org="-",
record=df_k_line,
data_frame_measurement_name=measurement,
data_frame_tag_columns=["symbol"],
)
def query_stock_from_database(
self,
symbol: str,
startDate: str,
endDate: str,
interval: str = "daily",
market: str = "cn",
fuquan: str = "",
):
date_start_obj = datetime.strptime(startDate, "%Y-%m-%d")
start_time = date_start_obj.strftime("%Y-%m-%dT00:00:00Z")
date_end_obj = datetime.strptime(endDate, "%Y-%m-%d")
end_time = date_end_obj.strftime("%Y-%m-%dT23:59:59.999999Z")
measurement = f"{market}_{interval}_kline{fuquan}"
# 使用上下文管理器确保连接及时释放
with InfluxDBClient(
url=self.db_url,
token=self.token,
org=self.db_org,
timeout=self.timeout,
enable_gzip=True,
) as client:
query_api = client.query_api()
query = (
'from(bucket:"{}")'
" |> range(start: {}, stop: {})"
' |> filter(fn: (r) => r._measurement == "{}")'
' |> filter(fn: (r) => r.symbol == "{}")'
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["_time", "股票代码", "开盘", "收盘", "最高", "最低", "成交量", "成交额", "振幅", "涨跌幅", "涨跌额", "换手率" ])'
).format(self.bucket, start_time, end_time, measurement, symbol)
# 使用 chunked=True 分页读取,降低内存压力
tables = query_api.query_data_frame(query)
# 如果返回的是生成器,则将其合并为一个 DataFrame
# if hasattr(tables, '__iter__'):
# result_df = pd.concat(tables, ignore_index=True)
# else:
result_df = tables
return result_df
stockSeed = StockDataseed(
db_url=db_url,
db_org=db_org,
bucket=bucket,
token=token,
)
import akshare as ak
import time
import pandas as pd
from datetime import datetime
# --- 1. 获取指数成分股 ---
print(f"正在获取指数 {INDEX_SYMBOL} 的成分股列表...")
try:
# 获取中证指数网提供的成分数据 [[1]]
cons_df = index_stock_cons_df_11_16
# 假设返回的DataFrame中有一列是股票代码,通常名为 '品种代码' 或类似名称
# 请注意:akshare返回的列名可能会变化,请根据实际输出调整列名
print(cons_df.head()) # 打印前几行以确认列名
stock_symbols = cons_df['品种代码'].tolist() # 请根据实际列名修改 '品种代码'
print(f"获取到 {len(stock_symbols)} 只成分股。")
except Exception as e:
print(f"获取成分股列表失败: {e}")
stock_symbols = [] # 如果获取失败,后续循环不会执行
# --- 2. 遍历成分股并获取历史数据 ---
for i, symbol in enumerate(stock_symbols):
print(f"({i+1}/{len(stock_symbols)}) 正在获取股票 {symbol} 的历史数据...")
try:
# 获取沪深A股历史行情数据 [[10]]
# 注意:akshare的接口参数可能随版本更新,请参考最新文档
# 假设使用 stock_zh_a_hist 接口,并且支持 adjust 参数进行复权
stock_data = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=START_DATE, end_date=END_DATE, adjust=ADJUST_TYPE)
stockSeed.save_stock_to_database(
symbol=symbol,
dataframe=stock_data,
fuquan=ADJUST_TYPE,
)
if not stock_data.empty:
print(f" -> 成功获取 {len(stock_data)} 条数据。")
else:
print(f" -> 股票 {symbol} 返回空数据。")
except Exception as e:
print(f" -> 获取股票 {symbol} 数据时出错: {e}")
# --- 3. 控制请求频率 ---
# 在每次循环(除了最后一次)结束后暂停指定时间 [[24]]
if i < len(stock_symbols) - 1: # 避免在最后一次循环后也等待
print(f" -> 等待 {SLEEP_INTERVAL} 秒...")
time.sleep(SLEEP_INTERVAL)
print("所有成分股数据获取完成。")
Although I will use ‘with’ statement to close the db to avoid memory leaking, but it seems that doesn’t work.
Do you have any adivces?
Thanks
