title: 数据源 API description: Backtrader 完整数据源 API 参考
数据源 API¶
数据源是 Backtrader 中回测和实盘交易的行情数据来源。它们提供带有时间索引的 OHLCV(开盘、最高、最低、收盘、成交量、持仓量)数据。
类层次结构¶
AbstractDataBase (所有数据源的基类)
DataBase (功能完整的数据源)
CSVDataBase (CSV 文件解析)
GenericCSVData
YahooFinanceCSVData
BacktraderCSVData
PandasData (DataFrame 集成)
CCXTFeed (加密货币交易所)
... 以及更多
```bash
## 核心类
### `backtrader.AbstractDataBase`
所有数据源实现的基础类。
```python
class backtrader.AbstractDataBase:
"""所有数据源实现的基础类。"""
```bash
#### 参数
| 参数 | 类型 | 默认值 | 描述 |
|-----|------|--------|------|
| `dataname` | Any | None | 数据源标识符(文件名、URL、DataFrame 等) |
| `name` | str | "" | 数据源的显示名称 |
| `compression` | int | 1 | 时间周期压缩因子 |
| `timeframe` | TimeFrame | TimeFrame.Days | 时间周期类型 |
| `fromdate` | datetime | None | 数据过滤的起始日期 |
| `todate` | datetime | None | 数据过滤的结束日期 |
| `sessionstart` | time | time.min | 交易时段开始时间 |
| `sessionend` | time | time(23, 59, 59, 999990) | 交易时段结束时间 |
| `tz` | str | None | 输出时区 |
| `tzinput` | str | None | 输入时区 |
| `calendar` | str/Calendar | None | 使用的交易日历 |
#### 数据状态
| 状态 | 描述 |
|-----|------|
| `CONNECTED` | 数据源已连接 |
| `DISCONNECTED` | 数据源已断开 |
| `CONNBROKEN` | 连接中断 |
| `DELAYED` | 数据延迟 |
| `LIVE` | 实时数据流 |
| `NOTSUBSCRIBED` | 未订阅数据 |
| `NOTSUPPORTED_TF` | 不支持的时间周期 |
| `UNKNOWN` | 未知状态 |
### `backtrader.DataBase`
功能完整的数据源类。继承 `AbstractDataBase` 的所有功能。
```python
class backtrader.DataBase(backtrader.AbstractDataBase):
"""功能完整的数据源类。"""
```bash
## Line 系统
数据源使用 "Line" 系统进行时间序列数据访问。每个数据源提供以下 line:
### 标准 Line
| Line | 描述 |
|------|------|
| `datetime` | K 线时间戳 |
| `open` | 开盘价 |
| `high` | 最高价 |
| `low` | 最低价 |
| `close` | 收盘价 |
| `volume` | 成交量 |
| `openinterest` | 持仓量(用于衍生品) |
### 访问 Line 数据
```python
class MyStrategy(bt.Strategy):
def next(self):
# 当前 K 线值(索引 0)
current_close = self.data.close[0]
current_datetime = self.data.datetime.datetime(0)
# 前一根 K 线值(负索引)
prev_close = self.data.close[-1]
prev_high = self.data.high[-2]
# 数据长度
current_len = len(self.data)
```bash
### 数据索引
| 索引 | 含义 |
|-----|------|
| `0` | 当前 K 线(最新) |
| `-1` | 前一根 K 线 |
| `-2`, `-3`, ... | 历史 K 线 |
| `1` | 下一根 K 线(仅在重放/实盘场景中) |
## TimeFrame(时间周期)
`TimeFrame` 类定义了金融数据的时间周期。
### TimeFrame 常量
| 常量 | 值 | 描述 |
|-----|-----|------|
| `TimeFrame.Ticks` | 1 | Tick 级别数据 |
| `TimeFrame.MicroSeconds` | 2 | 微秒 |
| `TimeFrame.Seconds` | 3 | 秒 |
| `TimeFrame.Minutes` | 4 | 分钟 |
| `TimeFrame.Days` | 5 | 天 |
| `TimeFrame.Weeks` | 6 | 周 |
| `TimeFrame.Months` | 7 | 月 |
| `TimeFrame.Years` | 8 | 年 |
| `TimeFrame.NoTimeFrame` | 9 | 无时间周期 |
### TimeFrame 方法
```python
# 获取时间周期名称
name = bt.TimeFrame.getname(bt.TimeFrame.Days) # 返回 'Day'
name = bt.TimeFrame.getname(bt.TimeFrame.Minutes, 5) # 返回 'Minutes'
# 从名称获取常量
tf = bt.TimeFrame.TFrame('Days') # 返回 TimeFrame.Days
# 从常量获取名称
name = bt.TimeFrame.TName(bt.TimeFrame.Days) # 返回 'Days'
```bash
## 内置数据源
### CSV 数据源
#### GenericCSVData
解析可配置列映射的 CSV 文件。
```python
data = bt.feeds.GenericCSVData(
dataname='data.csv',
datetime=0, # 时间戳列索引
time=-1, # 时间列索引(无则为-1)
open=1, # 开盘价列索引
high=2, # 最高价列索引
low=3, # 最低价列索引
close=4, # 收盘价列索引
volume=5, # 成交量列索引
openinterest=6, # 持仓量列索引
dtformat='%Y-%m-%d %H:%M:%S', # 时间格式
tmformat='%H:%M:%S', # 时间格式
nullvalue=float('NaN'), # 缺失字段值
separator=',', # CSV 分隔符
headers=True, # 如果为 True 则跳过首行
)
```bash
- *CSV 参数:**
| 参数 | 类型 | 默认值 | 描述 |
|-----|------|--------|------|
| `dataname` | str/file | 必填 | CSV 文件名或类文件对象 |
| `datetime` | int | 0 | 时间戳列索引 |
| `time` | int | -1 | 时间列索引(无则为-1) |
| `open` | int | 1 | 开盘价列索引 |
| `high` | int | 2 | 最高价列索引 |
| `low` | int | 3 | 最低价列索引 |
| `close` | int | 4 | 收盘价列索引 |
| `volume` | int | 5 | 成交量列索引 |
| `openinterest` | int | 6 | 持仓量列索引 |
| `dtformat` | str/int/callable | "%Y-%m-%d %H:%M:%S" | 时间格式或 1/2 表示 Unix 时间戳 |
| `tmformat` | str | "%H:%M:%S" | 时间格式 |
| `nullvalue` | float | NaN | 缺失字段的填充值 |
| `separator` | str | "," | CSV 分隔符 |
| `headers` | bool | True | 是否跳过首行 |
- *dtformat 值:**
| 值 | 含义 |
|-----|------|
| `"%Y-%m-%d"` | strptime 使用的字符串格式 |
| `1` | Unix 时间戳(int,自纪元以来的秒数) |
| `2` | Unix 时间戳(float,自纪元以来的秒数) |
| `callable` | 将字符串转换为 datetime 的函数 |
#### YahooFinanceCSVData
解析 Yahoo Finance 格式的 CSV 文件。
```python
data = bt.feeds.YahooFinanceCSVData(
dataname='yahoo.csv',
reverse=False, # 数据按时间顺序排列
adjclose=True, # 使用复权收盘价
adjvolume=True, # 使用复权时调整成交量
round=True, # 对价格进行四舍五入
decimals=2, # 四舍五入的小数位数
)
```bash
- *Yahoo CSV 参数:**
| 参数 | 类型 | 默认值 | 描述 |
|-----|------|--------|------|
| `reverse` | bool | False | 如果数据按逆时间顺序排列则设为 True |
| `adjclose` | bool | True | 使用分红/拆股调整后的收盘价 |
| `adjvolume` | bool | True | 根据调整因子调整成交量 |
| `round` | bool | True | 将价格四舍五入 |
| `decimals` | int | 2 | 四舍五入的小数位数 |
| `roundvolume` | int/bool | False | 将成交量四舍五入到 N 位小数 |
#### BacktraderCSVData
解析 backtrader 测试 CSV 格式。
```python
data = bt.feeds.BacktraderCSVData(dataname='test.csv')
```bash
格式:`YYYY-MM-DD [HH:MM:SS] open high low close volume openinterest`
### Pandas 数据源
#### PandasData
使用 pandas DataFrame 作为数据源,通过列名匹配。
```python
import pandas as pd
# 创建标准列名的 DataFrame
df = pd.DataFrame({
'datetime': pd.date_range('2020-01-01', periods=100),
'open': np.random.randn(100).cumsum() + 100,
'high': np.random.randn(100).cumsum() + 102,
'low': np.random.randn(100).cumsum() + 98,
'close': np.random.randn(100).cumsum() + 100,
'volume': np.random.randint(1000, 10000, 100),
})
# 将 DataFrame 作为数据源
data = bt.feeds.PandasData(dataname=df)
# 或使用自定义列名
data = bt.feeds.PandasData(
dataname=df,
datetime=None, # None 表示使用索引
open='open_price',
high='high_price',
low='low_price',
close='close_price',
volume='vol',
openinterest=None, # None 表示列不存在
nocase=True, # 不区分大小写的列匹配
)
```bash
- *PandasData 参数:**
| 参数 | 类型 | 默认值 | 描述 |
|-----|------|--------|------|
| `dataname` | DataFrame | 必填 | Pandas DataFrame |
| `datetime` | None/int/str | None | datetime 列(None = 使用索引) |
| `open` | None/-1/int/str | -1 | 开盘价列(-1 = 自动检测) |
| `high` | None/-1/int/str | -1 | 最高价列 |
| `low` | None/-1/int/str | -1 | 最低价列 |
| `close` | None/-1/int/str | -1 | 收盘价列 |
| `volume` | None/-1/int/str | -1 | 成交量列 |
| `openinterest` | None/-1/int/str | -1 | 持仓量列 |
| `nocase` | bool | True | 不区分大小写的列匹配 |
- *列值含义:**
| 值 | 含义 |
|-----|------|
| `None` | DataFrame 中不存在该列 |
| `-1` | 按名称自动检测列 |
| `0, 1, 2...` | 数字列索引 |
| `"column_name"` | 字符串列名 |
#### PandasDirectData
直接使用 DataFrame 元组进行更快迭代。
```python
data = bt.feeds.PandasDirectData(
dataname=df, # 带列索引位置的 DataFrame
datetime=0,
open=1,
high=2,
low=3,
close=4,
volume=5,
openinterest=6,
)
```bash
### 实时/在线数据源
#### CCXTFeed(加密货币)
通过 CCXT 库连接加密货币交易所。
```python
# REST API 轮询
data = bt.feeds.CCXTFeed(
exchange='binance',
symbol='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
compression=1,
historical=False, # 仅下载历史数据后停止
backfill_start=True, # 启动时回填历史数据
ohlcv_limit=100, # 每次 REST 请求的 K 线数
drop_newest=False, # 删除最新(可能不完整)的 K 线
)
# 使用 WebSocket(需要 ccxt.pro)
data = bt.feeds.CCXTFeed(
exchange='binance',
symbol='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
use_websocket=True, # 启用 WebSocket
ws_reconnect_delay=5.0, # 重连延迟(秒)
ws_max_reconnect_delay=60.0, # 最大重连延迟
)
```bash
- *CCXTFeed 参数:**
| 参数 | 类型 | 默认值 | 描述 |
|-----|------|--------|------|
| `exchange` | str | 必填 | 交易所名称(binance、kraken 等) |
| `symbol` | str | 必填 | 交易对(BTC/USDT、ETH/USD 等) |
| `historical` | bool | False | 下载历史数据后停止 |
| `backfill_start` | bool | True | 启动时回填历史数据 |
| `ohlcv_limit` | int | 100 | 每次 REST 请求的最大 K 线数 |
| `drop_newest` | bool | False | 删除最新(可能不完整)的 K 线 |
| `use_websocket` | bool | False | 使用 WebSocket 获取实时数据 |
| `ws_reconnect_delay` | float | 5.0 | WebSocket 重连延迟 |
| `ws_max_reconnect_delay` | float | 60.0 | 最大重连延迟 |
#### YahooFinanceData
直接从 Yahoo Finance 下载数据(需要 `requests` 库)。
```python
data = bt.feeds.YahooFinanceData(
dataname='AAPL', # 股票代码
fromdate=datetime(2020, 1, 1),
todate=datetime(2023, 12, 31),
timeframe=bt.TimeFrame.Days,
period='d', # 'd'=日, 'w'=周, 'm'=月
adjclose=True, # 使用复权价格
proxies={}, # 代理配置
retries=3, # 下载重试次数
)
```bash
### 其他数据源
#### Quandl
```python
data = bt.feeds.Quandl(
dataname='WIKI/AAPL',
apikey='YOUR_API_KEY',
fromdate=datetime(2020, 1, 1),
)
```bash
#### Interactive Brokers
```python
data = bt.feeds.IBData(
dataname='AAPL',
host='127.0.0.1',
port=7496,
clientId=1,
)
```bash
#### OANDA
```python
data = bt.feeds.OandaData(
dataname='EUR_USD',
account='YOUR_ACCOUNT',
access_token='YOUR_TOKEN',
)
```bash
## 数据源方法
### 生命周期方法
#### `start(self)`
回测开始时调用。打开文件、连接数据源。
```python
# 在自定义数据源中重写
class MyFeed(bt.CSVDataBase):
def start(self):
super().start()
# 自定义初始化
```bash
#### `stop(self)`
回测结束时调用。关闭文件、断开连接。
```python
def stop(self):
# 自定义清理
super().stop()
```bash
#### `preload(self)`
在回测前将所有数据加载到内存。
```python
# 使用 cerebro.run(preload=True) 自动预加载
data = bt.feeds.PandasData(dataname=df)
data.preload() # 手动预加载
```bash
### 数据访问方法
#### `date2num(self, dt)`
将 datetime 转换为内部数字格式。
```python
dt_num = data.date2num(datetime(2023, 1, 1))
```bash
#### `num2date(self, dt=None, tz=None, naive=True)`
将内部数字格式转换为 datetime。
```python
dt = data.num2date() # 当前 K 线的 datetime
dt = data.num2date(data.lines.datetime[-1]) # 前一根 K 线
```bash
### 克隆方法
#### `clone(self, **kwargs)`
创建此数据源的克隆。
```python
data_clone = data.clone() # 完全复制
data_clone = data.clone(timeframe=bt.TimeFrame.Weeks) # 不同时间周期
```bash
#### `copyas(self, _dataname, **kwargs)`
以不同名称复制。
```python
data_copy = data.copyas('AAPL_Copy')
```bash
### 状态方法
#### `islive(self)`
如果这是实时数据源则返回 True。
```python
if data.islive():
print("这是实时数据源")
```bash
#### `haslivedata(self)`
如果实时数据可用则返回 True。
#### `get_notifications(self)`
获取待处理的状态通知。
```python
notifs = data.get_notifications()
for status, args, kwargs in notifs:
print(f"状态: {status}")
```bash
## 数据过滤器
### 添加过滤器
过滤器在数据加载时修改或删除 K 线。
```python
# 添加简单过滤器
data.addfilter(lambda x: x.close[0] > x.open[0]) # 仅保留阳线
# 添加过滤器类
data.addfilter(bt.filters.SessionData, session_end=time(15, 0))
```bash
### 内置过滤器
#### SessionFilter
按特定交易时段过滤 K 线。
```python
data.addfilter(bt.filters.SessionFilter)
```bash
#### SessionData
填充缺失的时段数据。
```python
data.addfilter(bt.filters.SessionData)
```bash
#### CalendarFilter
基于交易日历过滤。
```python
data.addfilter(bt.filters.CalendarFilter)
```bash
## 重采样和回放
### 重采样
将数据转换为不同时间周期。
```python
# 将分钟数据重采样为小时数据
data = bt.feeds.GenericCSVData(dataname='minute_data.csv')
data.resample(
timeframe=bt.TimeFrame.Minutes,
compression=60, # 60 分钟 = 1 小时
)
# 在 Cerebro 中
cerebro.resampledata(data, timeframe=bt.TimeFrame.Weeks, compression=1)
```bash
### 回放
精确控制地将 tick 数据处理为 K 线。
```python
data.replay(
timeframe=bt.TimeFrame.Minutes,
compression=5,
bar2edge=True, # 将 K 线对齐到时间周期边界
rightedge=True, # 使用右边缘作为时间戳
boundoff=0, # 边界偏移量
)
# 在 Cerebro 中
cerebro.replaydata(data, timeframe=bt.TimeFrame.Days)
```bash
## 自定义数据源
### 创建自定义 CSV 数据源
```python
import backtrader as bt
from datetime import datetime
class MyCSVData(bt.CSVDataBase):
"""自定义 CSV 解析器,用于我的数据格式。"""
params = (
('dtformat', '%d/%m/%Y'),
('separator', ';'),
)
def _loadline(self, linetokens):
# 解析 datetime
dt_str = linetokens[0]
dt = datetime.strptime(dt_str, self.p.dtformat)
# 转换为内部格式
self.lines.datetime[0] = self.date2num(dt)
# 解析 OHLCV
self.lines.open[0] = float(linetokens[1])
self.lines.high[0] = float(linetokens[2])
self.lines.low[0] = float(linetokens[3])
self.lines.close[0] = float(linetokens[4])
self.lines.volume[0] = float(linetokens[5])
self.lines.openinterest[0] = 0.0
return True
```bash
### 创建自定义实时数据源
```python
class MyLiveData(bt.DataBase):
"""自定义实时数据源。"""
params = (('api_url', '<https://api.example.com'),)>
def __init__(self):
super().__init__()
self.api_url = self.p.api_url
def start(self):
super().start()
# 连接到数据源
self.connected = True
self.put_notification(self.CONNECTED)
def stop(self):
# 断开连接
self.connected = False
super().stop()
def _load(self):
# 从 API 获取下一根 K 线
try:
import requests
response = requests.get(self.api_url)
bar_data = response.json()
# 解析并设置数据
dt = datetime.fromtimestamp(bar_data['timestamp'])
self.lines.datetime[0] = self.date2num(dt)
self.lines.open[0] = bar_data['open']
self.lines.high[0] = bar_data['high']
self.lines.low[0] = bar_data['low']
self.lines.close[0] = bar_data['close']
self.lines.volume[0] = bar_data['volume']
return True
except Exception:
self.put_notification(self.DISCONNECTED)
return False
def islive(self):
return True
```bash
### 创建带自定义 Line 的数据源
```python
class ExtendedData(bt.feeds.PandasData):
"""带有额外 line 的数据源。"""
# 添加自定义 lines
lines = ('adj_close', 'dividend',)
# 映射到 DataFrame 列
params = (
('adj_close', -1),
('dividend', -1),
)
```bash
## 使用多个数据源
### 添加多个数据源
```python
# 添加多个数据源
cerebro.adddata(bt.feeds.PandasData(dataname=df_aapl), name='AAPL')
cerebro.adddata(bt.feeds.PandasData(dataname=df_msft), name='MSFT')
cerebro.adddata(bt.feeds.PandasData(dataname=df_goog), name='GOOGL')
class MyStrategy(bt.Strategy):
def __init__(self):
self.aapl = self.getdatabyname('AAPL')
self.msft = self.getdatabyname('MSFT')
self.goog = self.getdatabyname('GOOGL')
def next(self):
# 访问不同的数据源
if self.aapl.close[0] > self.msft.close[0]:
self.buy(data=self.aapl)
```bash
### 数据源同步
```python
# 主从关系
data_daily = bt.feeds.PandasData(dataname=daily_df, name='daily')
data_hourly = bt.feeds.PandasData(dataname=hourly_df, name='hourly')
# 小时数据将同步到日边界
cerebro.adddata(data_daily)
cerebro.adddata(data_hourly)
```bash
## 性能优化
### 预加载
```python
# 对于使用历史数据的快速回测
cerebro.run(preload=True)
```bash
### 内存管理
```python
# 限制大数据集的内存使用
data = bt.feeds.PandasData(dataname=large_df)
cerebro.adddata(data)
data.qbuffer(savemem=1000) # 仅在内存中保留 1000 根 K 线
```bash
### 禁用缓存
```python
# 对小数据集禁用优化
cerebro.run preload=True, runonce=True, exactbars=False
```bash
## 完整示例
### 示例 1:CSV 回测
```python
import backtrader as bt
from datetime import datetime
class SmaCross(bt.Strategy):
def __init__(self):
self.sma_fast = bt.indicators.SMA(self.data.close, period=10)
self.sma_slow = bt.indicators.SMA(self.data.close, period=30)
self.crossover = bt.indicators.CrossOver(self.sma_fast, self.sma_slow)
def next(self):
if self.crossover > 0:
self.buy()
elif self.crossover < 0:
self.sell()
# 创建 cerebro
cerebro = bt.Cerebro()
# 添加 CSV 数据
data = bt.feeds.GenericCSVData(
dataname='stock_data.csv',
datetime=0,
time=-1,
open=1,
high=2,
low=3,
close=4,
volume=5,
dtformat='%Y-%m-%d',
fromdate=datetime(2020, 1, 1),
todate=datetime(2023, 12, 31),
)
cerebro.adddata(data)
# 添加策略
cerebro.addstrategy(SmaCross)
# 运行
results = cerebro.run()
```bash
### 示例 2:Pandas DataFrame
```python
import backtrader as bt
import pandas as pd
# 加载数据
df = pd.read_csv('data.csv', parse_dates=['date'], index_col='date')
# 创建数据源
data = bt.feeds.PandasData(
dataname=df,
datetime=None, # 使用索引
open='open',
high='high',
low='low',
close='close',
volume='volume',
)
# 创建 cerebro 并添加数据
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.run()
```bash
### 示例 3:重采样
```python
import backtrader as bt
# 加载分钟数据
data = bt.feeds.GenericCSVData(dataname='minute_data.csv')
# 创建 cerebro
cerebro = bt.Cerebro()
# 添加重采样数据(小时)
cerebro.resampledata(
data,
timeframe=bt.TimeFrame.Minutes,
compression=60,
bar2edge=True,
rightedge=True,
)
# 添加原始数据以供比较
cerebro.adddata(data, name='minutes')
# 运行
cerebro.run()
```bash
### 示例 4:多数据源
```python
import backtrader as bt
class MultiAssetStrategy(bt.Strategy):
def __init__(self):
# 存储数据源
self.data1 = self.datas[0]
self.data2 = self.datas[1]
# 为每个数据源创建指标
self.sma1 = bt.indicators.SMA(self.data1.close, period=20)
self.sma2 = bt.indicators.SMA(self.data2.close, period=20)
def next(self):
# 基于两个数据源进行交易
if self.sma1[0] > self.sma2[0]:
if not self.getposition(self.data1):
self.buy(data=self.data1)
cerebro = bt.Cerebro()
# 添加多个数据源
cerebro.adddata(bt.feeds.PandasData(dataname=df1), name='Asset1')
cerebro.adddata(bt.feeds.PandasData(dataname=df2), name='Asset2')
cerebro.addstrategy(MultiAssetStrategy)
cerebro.run()
```bash
## 相关文档
- [策略 API](strategy_zh.md) - 策略开发
- [指标 API](indicator_zh.md) - 技术指标
- [Cerebro API](cerebro_zh.md) - 回测引擎