title: LineSeries 时间序列 API description: Backtrader LineSeries 完整 API 参考文档


LineSeries 时间序列 API

LineSeries 是 Backtrader 中管理多线时间序列数据的核心类。它为数据源、指标、观察器等提供统一的时间序列数据访问接口,支持历史数据访问、切片操作、pandas 转换等功能。

类层次结构

LineRoot (所有线对象的基类)
    LineSingle (单线对象)
        LineBuffer (环形缓冲区实现)
    LineMultiple (多线对象)
        LineSeries (多线时间序列)
            Indicator (技术指标基类)
            DataSeries (数据源基类)
            Strategy (策略基类)

```bash

## 核心概念

### Line(线)

Line  Backtrader 中存储时间序列数据的基本单元。它使用环形缓冲区实现,具有以下特性:

- **索引 0 始终指向当前值**:最新的数据值总是在索引 0 位置
- **正索引访问历史数据**:`data[-1]` 获取前一个值,`data[-2]` 获取再前一个值
- **负索引访问未来数据**:`data[1]` 在重放或实时场景中可访问下一个值
- **自动内存管理**:通过 qbuffer 模式控制内存使用

### 时间索引模式

```bash
历史数据              当前            未来数据
    |                 |               |

    v                 v               v
  ...  [-3]  [-2]  [-1]   [0]   [1]   [2]  ...
                  前一根 K 线   当前 K 线

```bash

## LineSeries 类

### 类定义

```python
class backtrader.LineSeries(LineMultiple, LineSeriesMixin, ParamsMixin):
    """管理多条线的时间序列对象基类。"""

```bash

### 核心属性

| 属性 | 类型 | 描述 |

|------|------|------|

| `lines` | Lines | 线条容器对象,存储所有 LineBuffer |

| `plotinfo` | PlotInfoObj | 绘图配置对象 |

| `plotlines` | PlotLinesObj | 线条绘图配置 |

| `csv` | bool | 是否支持 CSV 导出 |

### 线条操作属性

| 属性 | 返回值 | 描述 |

|------|--------|------|

| `array` | array | 第一条线的底层数组 |

| `line` | LineBuffer | 第一条线(单线指标快捷访问) |

| `l` | Lines | lines 的别名 |

## 时间序列操作

### 数据访问

```python
class MyStrategy(bt.Strategy):
    def next(self):

# 当前值(索引 0)
        current_close = self.data.close[0]
        current_sma = self.sma[0]

# 历史值(负索引)
        prev_close = self.data.close[-1]    # 前一根 K 线
        prev_close_2 = self.data.close[-2]  # 前两根 K 线
        prev_close_5 = self.data.close[-5]  # 前五根 K 线

# 未来值(正索引,仅在重放/实时场景有效)

# next_close = self.data.close[1]

```bash

### 数据长度

```python
def next(self):

# 获取数据长度(已加载的 K 线数量)
    current_bar = len(self.data)
    total_bars = len(self)

# 检查是否有足够的历史数据
    if len(self.data) >= 20:

# 可以计算 20 周期指标
        pass

```bash

### 时间操作

```python
def next(self):

# 获取当前 K 线时间(多种方式)
    dt_num = self.data.datetime[0]              # 内部数字格式
    dt = self.data.datetime.datetime(0)         # datetime 对象
    dt_date = self.data.datetime.date(0)        # date 对象
    dt_time = self.data.datetime.time(0)        # time 对象

# 前一根 K 线时间
    prev_dt = self.data.datetime.datetime(-1)

```bash

## 数据访问模式表

| 表达式 | 含义 | 使用场景 |

|--------|------|----------|

| `data[0]` | 当前 K 线值 | 获取最新数据 |

| `data[-1]` | 前一根 K 线值 | 获取上一个历史值 |

| `data[-n]` |  n  K 线值 | 获取 n 个周期前的值 |

| `data[1]` | 下一根 K 线值 | 重放/实时场景中的未来值 |

| `len(data)` | 数据长度 | 获取已加载 K 线数 |

| `data.array` | 底层数组 | 直接访问完整数据 |

## 切片和索引

### get() 方法

获取指定位置和大小的一片数据:

```python
def next(self):

# 获取最近 3 根 K 线的收盘价

# 返回: [close[-2], close[-1], close[0]]
    recent_3 = self.data.close.get(ago=0, size=3)

# 获取从当前往前的 5 根 K 线
    last_5 = self.data.close.get(ago=-4, size=5)

# 使用方式
    avg_price = sum(recent_3) / len(recent_3)

```bash

### 切片操作

```python
def next(self):

# 获取数组切片(基于内部数组索引)

# 注意:这是直接操作底层数组,需要理解内部结构
    array_data = self.data.close.array

# 常用模式:获取最近 N 个值
    recent_values = array_data[-self.p.period:]

```bash

## 对齐和同步

### 多数据源同步

当使用多个数据源时,Backtrader 会自动对齐它们:

```python
cerebro = bt.Cerebro()

# 添加多个数据源

cerebro.adddata(daily_data, name='daily')
cerebro.adddata(weekly_data, name='weekly')

class MyStrategy(bt.Strategy):
    def next(self):

# 数据源自动按日期对齐

# 当周线有新 K 线时,两个数据源的 next() 都会被调用
        daily_len = len(self.data0)  # 日线数据长度
        weekly_len = len(self.data1)  # 周线数据长度

# 访问不同数据源
        if self.data0.close[0] > self.data1.close[0]:
            self.buy(data=self.data0)

```bash

### 数据源访问方式

```python
class MyStrategy(bt.Strategy):
    def __init__(self):

# 方式 1:通过索引访问
        self.data0 = self.datas[0]
        self.data1 = self.datas[1]

# 方式 2:通过名称访问(需要先设置 name)
        self.daily = self.getdatabyname('daily')
        self.weekly = self.getdatabyname('weekly')

```bash

## 周期和时间框架处理

### TimeFrame 常量

```python

# TimeFrame 定义

TimeFrame.Ticks        # 1 - Tick 数据

TimeFrame.MicroSeconds # 2 - 微秒

TimeFrame.Seconds      # 3 - 秒

TimeFrame.Minutes      # 4 - 分钟

TimeFrame.Days         # 5 - 天

TimeFrame.Weeks        # 6 - 周

TimeFrame.Months       # 7 - 月

TimeFrame.Years        # 8 - 年

TimeFrame.NoTimeFrame  # 9 - 无时间周期

```bash

### 获取数据源时间框架

```python
class MyStrategy(bt.Strategy):
    def next(self):

# 获取时间框架类型
        tf = self.data._timeframe
        comp = self.data._compression

# 判断数据类型
        if tf == bt.TimeFrame.Days:
            if comp == 1:
                print("日线数据")
            elif comp == 7:
                print("周线数据(7 天压缩)")

```bash

### TimeFrame 方法

```python

# 获取时间框架名称

name = bt.TimeFrame.getname(bt.TimeFrame.Days)

# 返回: 'Day'

name = bt.TimeFrame.getname(bt.TimeFrame.Minutes, 5)

# 返回: 'Minutes'

# 从常量获取名称

name = bt.TimeFrame.TName(bt.TimeFrame.Days)

# 返回: 'Days'

# 从名称获取常量

tf = bt.TimeFrame.TFrame('Days')

# 返回: TimeFrame.Days (5)

```bash

## 与 pandas 的关系

### 转换为 pandas Series

```python
import backtrader as bt
import pandas as pd

class MyStrategy(bt.Strategy):
    def stop(self):

# 获取完整数据数组
        close_array = self.data.close.array

# 创建 pandas Series

# 注意:需要手动构建日期索引
        dates = []
        for i in range(len(self.data)):
            dt = self.data.datetime.date(i)
            dates.append(dt)

        df = pd.DataFrame({
            'close': close_array[:len(dates)],
        }, index=dates)
        df.index.name = 'date'

```bash

### 从 pandas 创建数据源

```python
import pandas as pd
import backtrader as bt

# 创建 DataFrame

df = pd.DataFrame({
    'datetime': pd.date_range('2020-01-01', periods=100),
    'open': np.random.randn(100).cumsum() + 100,
    'high': np.random.randn(100).cumsum() + 102,
    'low': np.random.randn(100).cumsum() + 98,
    'close': np.random.randn(100).cumsum() + 100,
    'volume': np.random.randint(1000, 10000, 100),
})

# 设置索引

df.set_index('datetime', inplace=True)

# 创建数据源

data = bt.feeds.PandasData(dataname=df)

```bash

### PandasData 参数映射

```python
class CustomPandasData(bt.feeds.PandasData):

# 自定义列名映射
    lines = ('close', 'volume', 'custom_line')

# 参数映射
    params = (
        ('datetime', None),      # None = 使用索引
        ('open', 'open_price'),
        ('high', 'high_price'),
        ('low', 'low_price'),
        ('close', 'close_price'),
        ('volume', 'vol'),
        ('openinterest', None),  # None = 列不存在
    )

```bash

## 常见使用模式

### 模式 1:访问历史数据计算指标

```python
class CustomIndicator(bt.Indicator):
    lines = ('value',)
    params = (('period', 20),)

    def __init__(self):
        super().__init__()
        self.addminperiod(self.p.period)

    def next(self):

# 计算最近 N 个周期的平均值
        total = 0.0
        for i in range(self.p.period):
            total += self.data.close[-i]

        self.lines.value[0] = total / self.p.period

```bash

### 模式 2:比较当前和之前值

```python
def next(self):

# 检查收盘价是否连续上涨
    if (self.data.close[0] > self.data.close[-1] and
        self.data.close[-1] > self.data.close[-2] and
        self.data.close[-2] > self.data.close[-3]):

# 连续 3 根 K 线上涨
        self.buy()

```bash

### 模式 3:条件访问避免越界

```python
def next(self):

# 确保有足够的历史数据
    if len(self.data) < self.p.period:
        return

# 安全访问历史数据
    prev_close = self.data.close[-self.p.period]

# 或者使用 minperiod
    if len(self.data) > self.p.period:

# 此处数据足够
        pass

```bash

### 模式 4:获取完整历史数据

```python
def next(self):

# 方式 1:使用 array 属性
    all_closes = self.data.close.array

# 方式 2:循环获取
    closes = []
    for i in range(len(self.data)):
        closes.append(self.data.close[i - len(self.data) + 1])

# 方式 3:使用 getzero
    all_data = self.data.close.getzero(0, len(self.data))

```bash

### 模式 5:多线指标访问

```python
class BollingerBands(bt.Indicator):
    lines = ('mid', 'top', 'bot')
    params = (('period', 20), ('devfactor', 2.0))

    def next(self):

# 访问不同输出线
        mid = self.lines.mid[0]
        top = self.lines.top[0]
        bot = self.lines.bot[0]

# 或通过名称访问
        mid = self.mid[0]
        top = self.top[0]
        bot = self.bot[0]

```bash

## LineSeries 方法

### 长度操作

#### `len(self)`

返回 LineSeries 的长度(已处理的数据点数量)。

```python
current_length = len(self.indicator)

```bash

#### `size(self)`

返回线条数量(不包括额外线条)。

```python
num_lines = self.indicator.size()

```bash

### 索引操作

#### `__getitem__(self, key)`

获取主线条的值。

```python
value = self.indicator[0]      # 当前值

value = self.indicator[-1]     # 前一个值

```bash

#### `__call__(self, ago=None, line=-1)`

返回延迟的线或指定索引/名称的值。

```python

# 返回当前线

current = self.indicator()

# 返回延迟 3 个周期的线

delayed = self.indicator(ago=3)

# 返回指定线的当前值

value = self.indicator(line='close')

```bash

### 缓冲区操作

#### `qbuffer(self, savemem=0)`

启用队列缓冲模式以节省内存。

```python

# 仅保留最近的数据

self.data.qbuffer(savemem=1000)

# 对于指标

self.sma.qbuffer()

```bash

#### `minbuffer(self, size)`

设置最小缓冲区大小。

```python

# 确保至少有 100 个数据点

self.indicator.minbuffer(100)

```bash

### 导航操作

#### `home(self)`

将所有线条重置到起始位置。

```python
self.indicator.home()

```bash

#### `rewind(self, size=1)`

回退指定数量的位置。

```python
self.indicator.rewind(5)  # 回退 5 个位置

```bash

#### `advance(self, size=1)`

前进指定数量的位置。

```python
self.indicator.advance(1)  # 前进 1 个位置

```bash

#### `forward(self, value=0.0, size=1)`

前进所有线条并填充值。

```python
self.indicator.forward(size=1)

```bash

#### `backwards(self, size=1, force=False)`

向后移动所有线条。

```python
self.indicator.backwards(size=1)

```bash

#### `reset(self)`

重置所有线条到初始状态。

```python
self.indicator.reset()

```bash

#### `extend(self, value=0.0, size=0)`

扩展所有线条。

```python
self.indicator.extend(size=10)

```bash

### 线条操作

#### `_getline(self, line, minusall=False)`

通过名称或索引获取线条。

```python

# 通过索引

line = self.indicator._getline(0)

# 通过名称

line = self.indicator._getline('close')

# 使用 minusall 参数

line = self.indicator._getline(-1, minusall=True)  # 最后一条线

```bash

## 性能优化

### 使用 array 属性

对于需要访问全部数据的场景,直接使用 array 属性:

```python
def next(self):

# 快速访问全部数据
    data_array = self.data.close.array

# 使用 NumPy 操作(如果已导入)
    import numpy as np
    mean = np.mean(data_array[-20:])

```bash

### 启用缓存模式

对于长时间运行的回测:

```python

# 在 Cerebro 中设置

cerebro = bt.Cerebro()

# 对数据源启用缓存

data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data)
data.qbuffer(savemem=1000)  # 仅保留最近 1000 根 K 线

```bash

### 使用 runonce 模式

```python

# 批处理模式,更快

cerebro.run(runonce=True)

```bash

## 完整示例

### 示例 1:自定义多线指标

```python
import backtrader as bt

class MultiOutputIndicator(bt.Indicator):
    """
    自定义指标:价格通道
    返回三条线:上轨、中轨、下轨
    """
    lines = ('upper', 'middle', 'lower')
    params = (('period', 20),)

    plotinfo = dict(
        subplot=False,  # 在主图上绘制
    )

    def __init__(self):
        super().__init__()
        self.addminperiod(self.p.period)

    def next(self):

# 计算中轨(SMA)
        total = 0.0
        high_max = self.data.high[-self.p.period]
        low_min = self.data.low[-self.p.period]

        for i in range(self.p.period):
            price = (self.data.high[-i] + self.data.low[-i] + self.data.close[-i]) / 3
            total += price
            if self.data.high[-i] > high_max:
                high_max = self.data.high[-i]
            if self.data.low[-i] < low_min:
                low_min = self.data.low[-i]

        self.lines.middle[0] = total / self.p.period
        self.lines.upper[0] = high_max
        self.lines.lower[0] = low_min


class MyStrategy(bt.Strategy):
    def __init__(self):

# 创建自定义指标
        self.channel = MultiOutputIndicator(self.data, period=20)

# 内置指标
        self.sma = bt.indicators.SMA(self.data.close, period=20)

    def next(self):

# 访问自定义指标的多条线
        if self.data.close[0] > self.channel.upper[0]:

# 价格突破上轨
            self.buy()
        elif self.data.close[0] < self.channel.lower[0]:

# 价格跌破下轨
            self.sell()

```bash

### 示例 2:历史数据分析

```python
class AnalysisStrategy(bt.Strategy):
    def __init__(self):
        self.sma20 = bt.indicators.SMA(self.data.close, period=20)
        self.sma50 = bt.indicators.SMA(self.data.close, period=50)

    def next(self):

# 获取最近 N 个值的列表
        recent_20 = self.data.close.get(ago=0, size=20)

# 计算自定义统计
        avg = sum(recent_20) / len(recent_20)

# 检查趋势
        if self.sma20[0] > self.sma20[-1] > self.sma20[-2]:

# 均线上升
            pass

# 时间相关分析
        current_dt = self.data.datetime.datetime(0)
        if current_dt.hour >= 9 and current_dt.hour < 15:

# 交易时段
            pass

```bash

### 示例 3:多时间框架分析

```python
class MultiTimeFrameStrategy(bt.Strategy):
    def __init__(self):

# 获取不同时间框架的数据
        self.daily = self.data0  # 日线
        self.weekly = self.data1  # 周线

# 为每个时间框架创建指标
        self.daily_sma = bt.indicators.SMA(self.daily.close, period=20)
        self.weekly_sma = bt.indicators.SMA(self.weekly.close, period=20)

    def next(self):

# 检查多时间框架对齐
        if len(self.weekly) > len(self.weekly_sma):

# 周线指标有效
            if self.daily.close[0] > self.daily_sma[0]:
                if self.weekly.close[0] > self.weekly_sma[0]:

# 双时间框架趋势一致
                    self.buy(data=self.daily)

```bash

## 常见陷阱

1. **索引越界**:在访问历史数据前检查 `len(data)`
2. **最小周期**:使用 `addminperiod()` 确保指标有足够数据
3. **未来数据泄露**:避免在回测中使用 `data[1]` 等未来数据
4. **数据对齐**:多数据源时注意数据可能不对齐
5. **时间处理**:datetime 是内部数字格式,需要转换

## 相关文档

- [Data Feeds API](data-feeds_zh.md) - 数据源配置
- [Indicator API](indicator_zh.md) - 指标开发
- [Strategy API](strategy_zh.md) - 策略开发
- [Cerebro API](cerebro_zh.md) - 回测引擎