#!/usr/bin/env python
"""Strategy module - Base class for user-defined trading strategies.
This module provides the Strategy class which serves as the foundation for
all user-defined trading strategies in Backtrader. It handles order management,
position tracking, indicator integration, and the event-driven execution model.
Key Features:
- Order creation and management (buy, sell, close, cancel)
- Position tracking per data feed
- Integration with indicators and analyzers
- Event notifications (order, trade, data, timer)
- Support for multiple data feeds and timeframes
- Signal-based trading via SignalStrategy
Example:
Basic strategy implementation::
import backtrader as bt
class MyStrategy(bt.Strategy):
params = (('period', 20),)
def __init__(self):
self.sma = bt.indicators.SMA(period=self.p.period)
def next(self):
if self.data.close[0] > self.sma[0]:
self.buy()
elif self.data.close[0] < self.sma[0]:
self.sell()
Classes:
Strategy: Main base class for trading strategies.
SignalStrategy: Strategy subclass that responds to signal indicators.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import collections
import copy
import datetime
import itertools
import math
from typing import Optional
from .lineiterator import LineIterator, StrategyBase
from .lineroot import LineRoot, LineSingle
from .lineseries import LineSeriesStub
from .metabase import ItemCollection, OwnerContext, findowner
from .order import Order
from .position_modes import (
POSITION_MODE_DUAL_SIDE,
POSITION_OFFSET_CLOSE,
POSITION_SIDE_LONG,
POSITION_SIDE_SHORT,
normalize_position_mode,
normalize_position_side,
trade_key_from_order,
)
from .signal import (
SIGNAL_LONG,
SIGNAL_LONG_ANY,
SIGNAL_LONG_INV,
SIGNAL_LONGEXIT,
SIGNAL_LONGEXIT_ANY,
SIGNAL_LONGEXIT_INV,
SIGNAL_LONGSHORT,
SIGNAL_SHORT,
SIGNAL_SHORT_ANY,
SIGNAL_SHORT_INV,
SIGNAL_SHORTEXIT,
SIGNAL_SHORTEXIT_ANY,
SIGNAL_SHORTEXIT_INV,
)
from .sizers.fixedsize import FixedSize
from .trade import Trade
from .utils import AutoDictList, AutoOrderedDict
from .utils.log_message import SpdLogManager, get_logger
from .utils.py3 import MAXINT, filter, integer_types, iteritems, keys, map, string_types
logger = get_logger(__name__)
[docs]
class Strategy(StrategyBase):
"""Base class for user-defined trading strategies.
This class provides the core functionality for implementing trading
strategies including order management, position tracking, and event
handling. Users should subclass this to create custom strategies.
Attributes:
env: Reference to the Cerebro environment.
cerebro: Alias for env.
broker: Reference to the broker for order execution.
datas: List of data feeds available to the strategy.
data: Shortcut to the first data feed (datas[0]).
position: Current position for the main data feed.
stats: Collection of observer instances.
analyzers: Collection of analyzer instances.
Methods to Override:
__init__: Initialize indicators and strategy state.
start: Called when the strategy starts running.
prenext: Called before minimum period is reached.
nextstart: Called once when minimum period is first reached.
next: Main strategy logic, called on each bar.
stop: Called when the strategy stops running.
notify_order: Receive order status notifications.
notify_trade: Receive trade notifications.
notify_data: Receive data feed notifications.
notify_timer: Receive timer notifications.
Example:
class MyStrategy(bt.Strategy):
params = (('period', 20),)
def __init__(self):
self.sma = bt.indicators.SMA(period=self.p.period)
def next(self):
if not self.position:
if self.data.close[0] > self.sma[0]:
self.buy()
else:
if self.data.close[0] < self.sma[0]:
self.close()
"""
# Class-level storage for strategies
_indcol: dict = {}
@classmethod
def _create_strategy_safely(cls, *args, **kwargs):
"""Safely create a strategy instance with proper parameter filtering.
Separates __new__ (parameter setup) from __init__ (data/indicator setup)
to ensure parameters are fully processed before __init__ runs.
"""
# __new__ processes all kwargs into _params_instance
instance = cls.__new__(cls, *args, **kwargs)
# __init__ handles data setup, clock, and user subclass init.
# Pass original kwargs so __init__ can filter out param kwargs itself.
if instance is not None:
Strategy.__init__(instance, *args, **kwargs)
return instance
[docs]
def __new__(cls, *args, **kwargs):
"""Override __new__ to handle method renaming that was done in MetaStrategy"""
# CRITICAL: First call StrategyBase.__new__ to properly set up data arguments and lines
# This ensures strategies get their data arguments processed correctly
instance = super().__new__(cls, *args, **kwargs)
# Store the original kwargs for parameter processing
instance._strategy_init_kwargs = kwargs
# CRITICAL FIX: Manually set up parameters here since Strategy inherits from ParamsMixin
# But we need to ensure the kwargs from cerebro.addstrategy are properly processed
if hasattr(cls, "_params") and cls._params is not None:
params_cls = cls._params
param_names = set()
# Get all parameter names from the class
if hasattr(params_cls, "_getpairs"):
param_names.update(params_cls._getpairs().keys())
elif hasattr(params_cls, "_gettuple"):
param_names.update(key for key, value in params_cls._gettuple())
# Filter parameter kwargs
param_kwargs = {k: v for k, v in kwargs.items() if k in param_names}
# Create parameter instance
try:
instance._params_instance = params_cls()
except Exception as exc:
raise TypeError(
f"Failed to create params instance for {cls.__name__}: {exc}"
) from exc
# Set all parameter values - first defaults, then custom values
if hasattr(params_cls, "_getpairs"):
for key, value in params_cls._getpairs().items():
# Use custom value if provided, otherwise use default
final_value = param_kwargs.get(key, value)
setattr(instance._params_instance, key, final_value)
elif hasattr(params_cls, "_gettuple"):
for key, value in params_cls._gettuple():
# Use custom value if provided, otherwise use default
final_value = param_kwargs.get(key, value)
setattr(instance._params_instance, key, final_value)
# Set any extra parameters that were passed but not in the params definition
for key, value in param_kwargs.items():
if not hasattr(instance._params_instance, key):
setattr(instance._params_instance, key, value)
else:
# No parameters defined, create parameter instance from kwargs
instance._params_instance = type("ParamsInstance", (), {})()
# Set all kwargs as parameters
for key, value in kwargs.items():
setattr(instance._params_instance, key, value)
# Create p property for parameter access
instance.p = instance._params_instance
# Handle method renaming like the old MetaStrategy.__new__ did
if hasattr(cls, "notify") and not hasattr(cls, "notify_order"):
cls.notify_order = cls.notify
delattr(cls, "notify")
if hasattr(cls, "notify_operation") and not hasattr(cls, "notify_trade"):
cls.notify_trade = cls.notify_operation
delattr(cls, "notify_operation")
# Register subclasses (from MetaStrategy.__init__)
if (
not getattr(cls, "aliased", False)
and cls.__name__ != "Strategy"
and not cls.__name__.startswith("_")
):
cls._indcol[cls.__name__] = cls
# Initialize critical attributes early (from MetaStrategy.donew and dopreinit)
# These need to be available before __init__ completes since methods might be called
from .cerebro import Cerebro
instance.env = instance.cerebro = cerebro = findowner(instance, Cerebro)
instance._id = cerebro._next_stid()
instance.broker = instance.env.broker
from .sizers import FixedSize
instance._sizer = FixedSize()
instance.stats = instance.observers = ItemCollection()
instance.analyzers = ItemCollection()
instance._alnames = collections.defaultdict(itertools.count)
instance.writers = []
instance._slave_analyzers = []
instance._tradehistoryon = False
instance._orders = []
instance._orderspending = []
instance._trades = collections.defaultdict(AutoDictList)
instance._tradespending = []
return instance
def __init__(self, *args, **kwargs):
"""Initialize with functionality from MetaStrategy methods"""
# Critical attributes already initialized in __new__
# Handle the functionality that was in MetaStrategy.dopostinit
self._sizer.set(self, self.broker)
# OPTIMIZED: Simple and fast data extraction from args
# Cerebro passes datas at the beginning of args (cerebro.py:1433)
if not hasattr(self, "datas") or not self.datas:
self.datas = []
# Quick method: Extract datas directly from args
# Cerebro prepends all datas to args, so we just need to identify them
if args:
for arg in args:
# Fast check: data feeds have 'lines' and 'datetime' attributes
if hasattr(arg, "lines") and hasattr(arg, "datetime"):
self.datas.append(arg)
# No need for nested loops or complex checks
# Fallback: Try cerebro.datas directly (fast)
if not self.datas and hasattr(self, "cerebro") and self.cerebro is not None:
if hasattr(self.cerebro, "datas") and self.cerebro.datas:
self.datas = list(self.cerebro.datas)
# Set up primary data reference and data0/data1 aliases
if self.datas:
self.data = self.datas[0]
for d, data in enumerate(self.datas):
setattr(self, f"data{d}", data)
else:
self.data = None
# Set up clock - this is critical for strategy execution
if not hasattr(self, "_clock") or self._clock is None:
if self.datas:
self._clock = self.datas[0]
# CRITICAL FIX: Don't create MinimalClock fallback
# It causes problems with indicator clock detection in _periodset()
# If no datas, leave _clock as None and let it be set later
# Call user subclass __init__ if this is a Strategy subclass
if self.__class__ != Strategy:
from backtrader.lineiterator import StrategyBase
# Guard against recursive calls when user's __init__ calls super().__init__()
if not getattr(self, "_user_init_called", False):
self._user_init_called = True
for cls in self.__class__.__mro__:
if (
cls not in (Strategy, StrategyBase)
and hasattr(cls, "__init__")
and "__init__" in cls.__dict__
):
# Use _original_init if available to avoid calling patched_init
# (prevents infinite recursion when ParamsMixin patches __init__)
if hasattr(cls, "_original_init"):
user_init = cls._original_init
else:
user_init = cls.__dict__["__init__"]
# Use OwnerContext so indicators find this strategy as owner
with OwnerContext.set_owner(self):
user_init(self)
break
# Initialize tick/channel callback state (auto, no manual init needed)
if not hasattr(self, "_tick_count"):
self._tick_count = 0
self._event_count = 0
self._last_tick = {}
self._last_ob = {}
self._last_funding = {}
# Initialize critical attributes that are expected by strategy execution
# These should be available before any user code runs
if not hasattr(self, "_dlens"):
self._dlens = [len(data) for data in self.datas]
# CRITICAL FIX: DO NOT call super().__init__() here!
# StrategyBase.__init__ already calls super().__init__() which eventually
# calls Strategy.__init__. Calling super() again would create infinite recursion.
# The parent initialization is already done by StrategyBase.
# Clean up the temporary attribute
if hasattr(self, "_strategy_init_kwargs"):
delattr(self, "_strategy_init_kwargs")
# Line type is strategy type
_ltype = LineIterator.StratType
# CSV default is True
csv = True
# Old clock update methodology, default is False
_oldsync = False # update the clock using old methodology: data 0
# Keep the latest delivered data date in the line
lines = ("datetime",)
[docs]
def log(self, txt, dt=None, level="info"):
"""Log a message with optional datetime.
This method provides basic logging functionality. For comprehensive
logging including orders, trades, positions, indicators, and signals,
use the TradeLogger observer.
Args:
txt: The message text to log.
dt: Optional datetime. If None, uses current bar datetime.
level: Log level ('info', 'warning', 'error', 'debug').
Example:
>>> self.log(f'Close price: {self.data.close[0]:.2f}')
>>> self.log('Warning message', level='warning')
"""
dt = dt or self.datetime.datetime()
print(f"[{dt}] {txt}")
def _notify_signal_to_observers(self, action, size, price, data=None, reason=None):
"""Notify all TradeLogger observers about a trading signal.
This is called automatically by buy() and sell() methods to record
signals without requiring user intervention.
Args:
action: 'buy' or 'sell'
size: Order size
price: Signal price
data: Data feed (optional)
reason: Signal reason (optional)
"""
# Notify all observers that have log_signal method (TradeLogger)
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "log_signal"):
data_name = getattr(data, "_name", None) if data else None
observer.log_signal(action, size, price, data_name, reason)
def _notify_order_to_observers(self, order):
"""Notify all observers about an order status change.
Args:
order: The order object with status change
"""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_order"):
observer.notify_order(order)
def _notify_trade_to_observers(self, trade):
"""Notify all observers about a trade.
Args:
trade: The trade object
"""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_trade"):
observer.notify_trade(trade)
def _notify_store_to_observers(self, msg, *args, **kwargs):
"""Forward store notifications to observers that support runtime events."""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_store_event"):
observer.notify_store_event(msg, *args, **kwargs)
def _notify_data_to_observers(self, data, status, *args, **kwargs):
"""Forward data-feed notifications to observers that support runtime events."""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_data_event"):
observer.notify_data_event(data, status, *args, **kwargs)
def _notify_tick_to_observers(self, tick):
"""Forward tick events to observers that support tick logging."""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_tick_event"):
observer.notify_tick_event(tick)
def _notify_bar_to_observers(self, bar):
"""Forward bar events to observers that support bar logging."""
if hasattr(self, "stats") and self.stats:
for observer in self.stats:
if hasattr(observer, "notify_bar_event"):
observer.notify_bar_event(bar)
[docs]
def qbuffer(self, savemem=0, replaying=False):
"""Enable the memory saving schemes. Possible values for ``savemem``:
0: No savings. Each line object keeps in memory all values
1: All lines objects save memory, using the strict minimum needed
Negative values are meant to be used when plotting is required:
-1: Indicators at Strategy Level and Observers do not enable memory
savings (but anything declared below it does)
-2: Same as -1 plus activation of memory saving for any indicators
which has declared *plotinfo.plot* as False (will not be plotted)
"""
# If savemem < 0
if savemem < 0:
# Get any attribute that labels itself as Indicator
for ind in self._lineiterators[self.IndType]:
# Check if this ind is a single line
subsave = isinstance(ind, (LineSingle,))
# If not a single line and savemem == -2, check plotinfo.plot
if not subsave and savemem < -1:
subsave = not ind.plotinfo.plot
# Apply memory saving based on subsave flag
ind.qbuffer(savemem=subsave)
# If savemem > 0
elif savemem > 0:
# Apply memory saving to all data feeds
for data in self.datas:
data.qbuffer(replaying=replaying)
# Apply memory saving to all lines
for line in self.lines:
line.qbuffer(savemem=1)
# Apply memory saving to all lineiterators based on the strategy
for itcls in self._lineiterators:
for it in self._lineiterators[itcls]:
it.qbuffer(savemem=1)
# If savemem == 0, no action needed
else:
pass
def _iter_strategy_lineactions(self):
"""Yield LineActions stored as strategy attributes."""
from .linebuffer import LineActions
try:
cache = object.__getattribute__(self, "_strategy_lineactions_cache")
except AttributeError:
cache = None
if cache is not None:
yield from cache
return
seen = set()
registered = set()
def mark_registered(lineiter):
lineiter_id = id(lineiter)
if lineiter_id in registered:
return
registered.add(lineiter_id)
try:
child_lists = lineiter._lineiterators.values()
except AttributeError:
return
for children in child_lists:
for child in children:
mark_registered(child)
try:
for children in self._lineiterators.values():
for child in children:
mark_registered(child)
except AttributeError:
# No sub-iterator registry yet; nothing to pre-mark as registered.
pass
def visit(value):
value_id = id(value)
if value_id in seen:
return
seen.add(value_id)
if isinstance(value, LineActions):
for attr_name in ("_parent_a", "_parent_b", "a", "b", "cond"):
try:
dependency = getattr(value, attr_name)
except AttributeError:
continue
yield from visit(dependency)
try:
args = value.args
except AttributeError:
args = ()
for dependency in args:
yield from visit(dependency)
if value_id not in registered:
yield value
return
if isinstance(value, dict):
for item in value.values():
yield from visit(item)
elif isinstance(value, (list, tuple, set, frozenset)):
for item in value:
yield from visit(item)
try:
attrs = object.__getattribute__(self, "__dict__")
except AttributeError:
attrs = {}
result = []
for attr_name, attr_value in attrs.items():
if attr_name.startswith("_"):
continue
result.extend(visit(attr_value))
cache = tuple(result)
object.__setattr__(self, "_strategy_lineactions_cache", cache)
yield from cache
def _get_strategy_lineactions(self):
try:
return object.__getattribute__(self, "_strategy_lineactions_cache")
except AttributeError:
return tuple(self._iter_strategy_lineactions())
def _get_strategy_next_lineactions(self):
try:
return object.__getattribute__(self, "_strategy_next_lineactions_cache")
except AttributeError:
# Cache not built yet; compute and store it below.
pass
cache = tuple(
(lineaction, getattr(lineaction, "_clock", None))
for lineaction in self._get_strategy_lineactions()
if hasattr(lineaction, "_next")
)
object.__setattr__(self, "_strategy_next_lineactions_cache", cache)
return cache
def _stage2(self):
super()._stage2()
for lineaction in self._get_strategy_lineactions():
try:
lineaction._stage2()
except Exception:
logger.debug("Failed to stage2 strategy LineActions", exc_info=True)
def _stage1(self):
super()._stage1()
for lineaction in self._get_strategy_lineactions():
try:
lineaction._stage1()
except Exception:
logger.debug("Failed to stage1 strategy LineActions", exc_info=True)
def _next_strategy_lineactions(self):
"""Advance LineActions stored directly on the strategy.
LineActions created as strategy attributes, such as bt.Cmp/bt.If/bt.Max,
are not LineIterator indicators. They still need to be evaluated before
user next/prenext/nextstart reads them.
"""
for lineaction, clock in self._get_strategy_next_lineactions():
if clock is not None:
try:
if len(clock) <= len(lineaction):
try:
current_value = lineaction[0]
except Exception: # nosec B112
# Per-bar hot path: the line buffer may not be
# populated yet for this index. Skipping is the
# intended behaviour; logging here would fire every
# bar. No control-flow change.
continue
if current_value == current_value:
continue
lineaction.next()
continue
except Exception: # nosec B110
# Clock/value comparison unavailable; fall back to plain
# _next(). Hot path, intentionally silent (no logging).
pass
lineaction._next()
def _periodset(self):
"""Calculate and set the minimum period required for strategy execution.
This method determines the minimum number of bars needed before
the strategy's next() method can be called, based on the minimum
periods of all indicators and data feeds.
"""
def iter_indicator_tree(indicators):
seen = set()
stack = list(indicators)
while stack:
lineiter = stack.pop(0)
lineiter_id = id(lineiter)
if lineiter_id in seen:
continue
seen.add(lineiter_id)
yield lineiter
try:
children = lineiter._lineiterators[LineIterator.IndType]
except (AttributeError, KeyError):
continue
stack.extend(children)
# Data IDs
dataids = [id(data) for data in self.datas]
# Data minimum periods
_dminperiods = collections.defaultdict(list)
# Loop through all indicators
all_indicators = list(iter_indicator_tree(self._lineiterators[LineIterator.IndType]))
# CRITICAL FIX: bind secondary-feed indicator advance clocks now that
# the whole indicator tree is built. During construction, an indicator
# built on another indicator or a LinesOperation that follows a
# non-primary feed (e.g. SMA((h1.high + h1.low) / 2.0) or
# EMA(EMA(h4.close))) often has its clock defaulted to the strategy's
# primary feed because the parent clocks were not yet finalized. That
# makes the indicator warm up and emit values on the primary (fast)
# clock instead of the secondary (slow) feed in runonce mode. Here every
# clock is final, so we resolve each indicator's data dependency to the
# concrete feed it follows and, when that feed is a *secondary* feed,
# pin _resolved_secondary_clock to it (used by the runonce advance
# loop and Indicator.advance). See docs/DEV_REGRESSION_FAILURES.md.
from .lineiterator import _line_like_source_clock as _llsc
primary_feed = self.datas[0] if self.datas else None
# Map each feed's individual lines back to the owning feed so a clock
# that resolves to a feed *line* (e.g. data.high) can be attributed to
# its feed.
_line_to_feed = {}
for _data in self.datas:
_dlines = getattr(_data, "lines", None)
if _dlines is None:
continue
try:
for _ln in _dlines:
_line_to_feed[id(_ln)] = _data
except TypeError:
# Feed lines not iterable; skip mapping this feed's lines.
pass
def _feed_of(node, _seen=None):
"""Resolve a data node to the concrete feed it ultimately follows."""
if _seen is None:
_seen = set()
if node is None or id(node) in _seen:
return None
_seen.add(id(node))
if id(node) in dataids:
return node
if id(node) in _line_to_feed:
return _line_to_feed[id(node)]
try:
src = _llsc(node)
except Exception:
src = None
if src is not None and src is not node:
if id(src) in dataids:
return src
if id(src) in _line_to_feed:
return _line_to_feed[id(src)]
found = _feed_of(src, _seen)
if found is not None:
return found
nxt = getattr(node, "_clock", None)
if nxt is not None and nxt.__class__.__name__ != "MinimalClock":
found = _feed_of(nxt, _seen)
if found is not None:
return found
ndatas = getattr(node, "datas", None)
if ndatas:
found = _feed_of(ndatas[0], _seen)
if found is not None:
return found
# Walk owner references: a node may be an indicator output line
# (LineBuffer) whose owning indicator follows the target feed. The
# owner can be the indicator directly, or a Lines container whose
# _owner_ref points to it.
for owner_attr in ("_owner", "_owner_ref"):
owner = getattr(node, owner_attr, None)
if owner is not None and id(owner) not in _seen:
# A Lines container exposes _owner_ref to the real owner.
ref = getattr(owner, "_owner_ref", None)
if ref is not None and id(ref) not in _seen:
found = _feed_of(ref, _seen)
if found is not None:
return found
found = _feed_of(owner, _seen)
if found is not None:
return found
# Operands of a LinesOperation.
for op_attr in ("a", "b", "_parent_a", "_parent_b"):
operand = getattr(node, op_attr, None)
if operand is not None and id(operand) not in _seen:
found = _feed_of(operand, _seen)
if found is not None:
return found
return None
if primary_feed is not None and len(self.datas) > 1:
for lineiter in all_indicators:
idatas = getattr(lineiter, "datas", None)
if not idatas:
continue
feed = _feed_of(idatas[0])
if feed is not None and feed is not primary_feed and id(feed) in dataids:
# Pin only the advance clock used by the runonce post-phase
# loop and Indicator.advance(); deliberately do NOT change
# lineiter._clock here so the existing minperiod-to-feed
# attribution below (and thus the strategy warmup / bar_num)
# stays identical to the pre-fix behavior. Changing _clock
# perturbed multi-feed warmup for unrelated strategies.
lineiter._resolved_secondary_clock = feed
for lineiter in all_indicators:
# If multiple datas are used and multiple timeframes, the larger
# timeframe may place larger time constraints in calling next.
# Get the indicator's _clock attribute
clk = getattr(lineiter, "_clock", None)
# CRITICAL FIX: If clock is MinimalClock, use the indicator's actual data source
if (
clk is not None
and hasattr(clk, "__class__")
and "MinimalClock" in clk.__class__.__name__
):
if self.datas:
# Find which data feed the indicator's data source belongs to
clock_set = False
if hasattr(lineiter, "datas") and lineiter.datas:
ind_data = lineiter.datas[0]
for data_feed in self.datas:
# Check if ind_data is the data feed itself
if ind_data is data_feed:
clk = data_feed
clock_set = True
break
# Check if ind_data is one of the lines of this data feed
if hasattr(data_feed, "lines") and ind_data in data_feed.lines:
clk = data_feed
clock_set = True
break
if not clock_set:
clk = self.datas[0]
lineiter._clock = clk # Update indicator's clock
else:
clk = None
# If the attribute value is None
if clk is None:
# Get the indicator's owner's _clock attribute value
clk = getattr(lineiter._owner, "_clock", None)
# CRITICAL FIX: If owner's clock is also MinimalClock, use data
if (
clk is not None
and hasattr(clk, "__class__")
and "MinimalClock" in clk.__class__.__name__
):
if self.datas:
clk = self.datas[0]
else:
clk = None
if clk is None:
continue
# If clk is not None
while True:
# If clk is a data feed, break
if id(clk) in dataids:
break # already top-level clock (data feed)
# See if the current clock has higher level clocks
# Check if current clk has further _clock attribute
clk2 = getattr(clk, "_clock", None)
# If clk2 is None, get clk owner's _clock attribute value
if clk2 is None:
clk2 = getattr(clk._owner, "_clock", None)
if clk2 is None:
break # if no clock found, bail out
# If clk2 is not None, set clk to clk2
clk = clk2 # keep the ref and try to go up the hierarchy
# This check ensures clk is not None before proceeding
if clk is None:
continue # no clock found, go to next
# LineSeriesStub wraps a line and the clock is the wrapped line and
# not the wrapper itself.
# If clk is LineSeriesStub (multi-line object), get first line as clk
if isinstance(clk, LineSeriesStub):
clk = clk.lines[0]
# Save minimum period
_dminperiods[clk].append(lineiter._minperiod)
# Set minimum periods to empty list
self._minperiods = []
# Loop through all data feeds
for data in self.datas:
# Do not only consider the data as clock but also its lines, which
# may have been individually passed as clock references and
# discovered as clocks above
# Initialize with a data min period if any
# Minimum period needed for data to generate indicator lines
dlminperiods = _dminperiods[data]
# Loop through each line of data, add minperiods if line is in _dminperiods
for line in data.lines: # search each line for min periods
if line in _dminperiods:
dlminperiods += _dminperiods[line] # found, add it
# Keep the reference to the line if any was found
# If dlminperiods is not empty, calculate max value, else empty list
_dminperiods[data] = [max(dlminperiods)] if dlminperiods else []
# Data minimum period
dminperiod = max(_dminperiods[data] or [data._minperiod])
# Save minimum period to dminperiod
self._minperiods.append(dminperiod)
# Set the minperiod
# Indicator minimum periods
minperiods = [x._minperiod for x in all_indicators]
# CRITICAL FIX: Also scan strategy attributes for LineActions objects
# (like LinesOperation from sma - sma(-10)) that aren't registered as indicators
# but still need their minperiod considered
from .linebuffer import LineActions
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
try:
attr = getattr(self, attr_name)
# Check if it's a LineActions but not already in _lineiterators
if isinstance(attr, LineActions) and hasattr(attr, "_minperiod"):
if attr not in self._lineiterators[LineIterator.IndType]:
minperiods.append(attr._minperiod)
except (AttributeError, TypeError):
# Attribute access/typecheck failed; skip this attribute.
pass
# Set strategy minimum period to max of indicator and data minperiods
self._minperiod = max(minperiods or [self._minperiod])
# CRITICAL FIX: Update _minperiods for LineActions, but only for their associated data
# For single-data strategies, apply LineActions minperiod to data[0]
# For multi-data strategies, LineActions minperiod should only affect its source data
from .linebuffer import LineActions
if self._minperiods:
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
try:
attr = getattr(self, attr_name)
if isinstance(attr, LineActions) and hasattr(attr, "_minperiod"):
# Try to determine which data this LineActions is associated with
# by checking its _clock or data sources
data_idx = 0 # Default to data[0]
if hasattr(attr, "_clock") and attr._clock is not None:
for i, d in enumerate(self.datas):
if attr._clock is d or attr._clock in d.lines:
data_idx = i
break
# Only update minperiod for the specific data
if data_idx < len(self._minperiods):
self._minperiods[data_idx] = max(
self._minperiods[data_idx], attr._minperiod
)
except (AttributeError, TypeError):
# Attribute access/typecheck failed; skip this attribute.
pass
def _addwriter(self, writer):
"""Add a writer to the strategy.
Unlike the other _addxxx functions, this one receives an instance
because the writer works at cerebro level and is only passed to the
strategy to simplify the logic.
"""
self.writers.append(writer)
def _addindicator(self, indcls, *indargs, **indkwargs):
"""Add an indicator to the strategy.
Args:
indcls: Indicator class to instantiate
*indargs: Positional arguments for the indicator
**indkwargs: Keyword arguments for the indicator
"""
indcls(*indargs, **indkwargs)
def _addanalyzer_slave(self, ancls, *anargs, **ankwargs):
"""Add a slave analyzer for internal use.
Like _addanalyzer but meant for observers (or other entities) which
rely on the output of an analyzer for the data. These analyzers have
not been added by the user and are kept separate from the main
analyzers.
Args:
ancls: Analyzer class to instantiate
*anargs: Positional arguments for the analyzer
**ankwargs: Keyword arguments for the analyzer
Returns:
The created analyzer instance
"""
# Use OwnerContext so analyzer's findowner() can find this strategy
with OwnerContext.set_owner(self):
analyzer = ancls(*anargs, **ankwargs)
self._slave_analyzers.append(analyzer)
return analyzer
def _getanalyzer_slave(self, idx):
"""Get a slave analyzer by index.
Note: This appears to have a syntax bug - should use [] not append()
"""
return self._slave_analyzers.append[idx]
def _addanalyzer(self, ancls, *anargs, **ankwargs):
"""Add an analyzer to the strategy.
Args:
ancls: Analyzer class to instantiate
*anargs: Positional arguments for the analyzer
**ankwargs: Keyword arguments for the analyzer, may include _name
"""
anname = ankwargs.pop("_name", "") or ancls.__name__.lower()
nsuffix = next(self._alnames[anname])
anname += str(nsuffix or "") # 0 (first instance) gets no suffix
# Use OwnerContext so analyzer's findowner() can find this strategy
with OwnerContext.set_owner(self):
analyzer = ancls(*anargs, **ankwargs)
# PERFORMANCE FIX: Explicitly set analyzer's owner to ensure it has access to strategy
analyzer._parent = self
analyzer._owner = self
self.analyzers.append(analyzer, anname)
def _addobserver(self, multi, obscls, *obsargs, **obskwargs):
"""Add an observer to the strategy.
Args:
multi: If True, create one observer per data feed; if False, create single observer
obscls: Observer class to instantiate
*obsargs: Positional arguments for the observer
**obskwargs: Keyword arguments for the observer, may include obsname
"""
obsname = obskwargs.pop("obsname", "")
if not obsname:
obsname = obscls.__name__.lower()
if not multi:
newargs = list(itertools.chain(self.datas, obsargs))
# Use OwnerContext so observer's findowner() can find this strategy
with OwnerContext.set_owner(self):
obs = obscls(*newargs, **obskwargs)
# PERFORMANCE FIX: Explicitly set observer's owner to ensure it has access to strategy
obs._parent = self
obs._owner = self
self._register_observer(obs)
self.stats.append(obs, obsname)
return
setattr(self.stats, obsname, [])
obs_list = getattr(self.stats, obsname)
for data in self.datas:
# Use OwnerContext so observer's findowner() can find this strategy
with OwnerContext.set_owner(self):
obs = obscls(data, *obsargs, **obskwargs)
# PERFORMANCE FIX: Explicitly set observer's owner to ensure it has access to strategy
obs._parent = self
obs._owner = self
self._register_observer(obs)
obs_list.append(obs)
def _register_observer(self, obs):
"""Prepare an observer for execution.
Ensures the observer has _analyzers and has its clock set properly
for strategy-wide observers. Does NOT register in _lineiterators
to avoid double-processing (observers are processed bar-by-bar
via _next_observers, not via _once batch processing).
Args:
obs: Observer instance to register.
"""
# Ensure _analyzers exists (some observers don't call super().__init__)
if not hasattr(obs, "_analyzers"):
obs._analyzers = []
# Set clock for strategy-wide observers
if getattr(obs, "_stclock", False):
obs._clock = self
def _getminperstatus(self):
"""Check if minimum period requirements are satisfied.
Returns the maximum difference between required minimum periods
and current data lengths.
Returns:
int: Maximum value of (minperiod - current_length) across all data feeds.
Negative values indicate all minimum periods are satisfied.
"""
data_iter = iter(zip(self._minperiods, self.datas))
try:
minperiod, data = next(data_iter)
except StopIteration:
raise ValueError("max() arg is an empty sequence") from None
minperstatus = minperiod - len(data)
for minperiod, data in data_iter:
status = minperiod - len(data)
if status > minperstatus:
minperstatus = status
self._minperstatus = minperstatus
return minperstatus
[docs]
def prenext_open(self):
"""Called before next() during prenext phase.
This is a hook for strategies to take action at the open of each bar
before minimum period is reached.
"""
[docs]
def nextstart_open(self):
"""Called at the open of the first bar where minimum period is satisfied.
This is called only once, transitioning from prenext to next phase.
"""
self.next_open()
[docs]
def next_open(self):
"""Called at the open of each bar during normal execution.
This is a hook for strategies to take action at the open of each bar.
"""
def _oncepost_open(self):
"""Prepare for _oncepost execution based on minimum period status.
Routes to appropriate method based on minperstatus:
- minperstatus < 0: All data satisfied, call next_open()
- minperstatus == 0: First bar with satisfied data, call nextstart_open()
- minperstatus > 0: Data not ready, call prenext_open()
"""
minperstatus = self._minperstatus
if minperstatus < 0:
self.next_open()
elif minperstatus == 0:
self.nextstart_open() # only called for the 1st value
else:
self.prenext_open()
def _oncepost(self, dt):
"""Execute oncepost processing for a single time step.
Args:
dt: Current datetime
"""
# CRITICAL FIX: Ensure _clock is set to actual data, not MinimalClock
# During initialization, _clock might be set to MinimalClock if datas weren't available yet
if hasattr(self, "_clock") and self._clock is not None:
clock_type_name = type(self._clock).__name__
if clock_type_name == "MinimalClock" and self.datas:
# Replace MinimalClock with actual first data
self._clock = self.datas[0]
elif not hasattr(self, "_clock") or self._clock is None:
# Set clock to first data if not set
if self.datas:
self._clock = self.datas[0]
# Loop through indicators, advance if indicator clock length exceeds indicator length
for indicator in self._lineiterators[LineIterator.IndType]:
# Honor a pinned secondary-feed clock (set in _periodset) so
# indicators following a non-primary feed advance in sync with it.
adv_clock = getattr(indicator, "_resolved_secondary_clock", None) or indicator._clock
if len(adv_clock) > len(indicator):
indicator.advance()
# If using old data sync method, call advance; otherwise call forward
if self._oldsync:
# Strategy has not been reset, the line is there
self.advance()
else:
# strategy has been reset to beginning. advance step by step
self.forward()
# Set datetime - and save it as the last valid datetime for use in stop()
self.lines.datetime[0] = dt
if dt > 0:
self._last_valid_datetime = dt
# Notify
self._notify()
self._next_strategy_lineactions()
# CRITICAL FIX: In runonce mode, ensure indicator lencount matches strategy length
# This ensures len(indicator) == len(strategy) at the end of processing
try:
strategy_len = len(self)
strategy_clock = getattr(self, "_clock", None)
for indicator in self._lineiterators[LineIterator.IndType]:
if getattr(indicator, "_clock", None) is not strategy_clock:
continue
# Only update if indicator was processed in runonce mode
if hasattr(indicator, "_once_called") and indicator._once_called:
# Update lencount for all lines in the indicator
if hasattr(indicator, "lines") and hasattr(indicator.lines, "lines"):
for line in indicator.lines.lines:
if hasattr(line, "lencount"):
# Set lencount to match strategy length (which equals data length)
# Use the maximum of current lencount and strategy_len to ensure we don't decrease it
line.lencount = max(line.lencount, strategy_len)
except Exception:
logger.debug("Failed to update indicator lencount in _oncepost_nextday", exc_info=True)
# Get current minimum period status and route to appropriate method
# If all data satisfied, call next()
# If first bar with all data satisfied, call nextstart()
# If not all data satisfied, call prenext()
minperstatus = self._getminperstatus()
if minperstatus < 0:
self.next()
elif minperstatus == 0:
self.nextstart() # only called for the 1st value
else:
self.prenext()
# Update analyzers with minimum period status
self._next_analyzers(minperstatus, once=True)
# Update observers with minimum period status
self._next_observers(minperstatus, once=True)
# Clear pending orders and trades
self.clear()
def _clk_update(self):
"""Update the clock and advance strategy state if needed.
Returns:
int: Current length of the strategy
"""
# CRITICAL FIX: Ensure data is available before clock operations
if (
getattr(self, "_data_assignment_pending", True)
or not hasattr(self, "_clock")
or self._clock is None
):
# Try to get data assignment from cerebro if not already done
if hasattr(self, "_ensure_data_available"):
self._ensure_data_available()
# If using old data sync method
if self._oldsync:
# Call strategy's _clk_update() method
clk_len = super()._clk_update()
# Set datetime
if self.datas:
max_datetime = None
for data in self.datas:
if not len(data):
continue
dt_value = data.datetime[0]
if (
isinstance(dt_value, (int, float))
and math.isfinite(dt_value)
and dt_value > 0
):
if max_datetime is None or dt_value > max_datetime:
max_datetime = dt_value
if max_datetime is not None:
self.lines.datetime[0] = max_datetime
# Return data length
return clk_len
# CRITICAL FIX: Initialize _dlens if not present
if not hasattr(self, "_dlens"):
self._dlens = [len(d) for d in self.datas]
# Current new data lengths and valid datetimes in a single pass.
newdlens = []
max_datetime = None
for data in self.datas:
data_len = len(data)
newdlens.append(data_len)
if data_len:
dt_value = data.datetime[0]
if isinstance(dt_value, (int, float)) and math.isfinite(dt_value) and dt_value > 0:
if max_datetime is None or dt_value > max_datetime:
max_datetime = dt_value
# If new data length > old data length, forward
if any(nl > old_len for old_len, nl in zip(self._dlens, newdlens)):
self.forward()
# Set datetime to max of current datetimes - only update if we have valid datetimes
if max_datetime is not None:
self.lines.datetime[0] = max_datetime
# Old data length equals new data length
self._dlens = newdlens
return len(self)
def _next_open(self):
"""Execute next_open phase based on minimum period status.
Same logic as _oncepost_open().
"""
minperstatus = self._minperstatus
if minperstatus < 0:
self.next_open()
elif minperstatus == 0:
self.nextstart_open() # only called for the 1st value
else:
self.prenext_open()
def _next(self):
"""Execute next() method and update analyzers and observers.
Gets minimum period status and passes it to analyzers and observers,
then clears pending orders and trades.
"""
super()._next()
minperstatus = self._getminperstatus()
self._next_analyzers(minperstatus)
self._next_observers(minperstatus)
self.clear()
def _get_all_observers(self):
"""Get all observer instances from self.stats.
Handles both single observers and multi-data observer lists.
Ensures each observer has _analyzers attribute.
Returns:
list: Flat list of all observer instances.
"""
result = []
for item in self.stats.items:
if isinstance(item, list):
for obs in item:
if not hasattr(obs, "_analyzers"):
obs._analyzers = []
result.append(obs)
else:
if not hasattr(item, "_analyzers"):
item._analyzers = []
result.append(item)
return result
def _next_observers(self, minperstatus, once=False):
"""Update observers based on minimum period status.
Iterates over self.stats.items (populated by _addobserver) instead of
_lineiterators[ObsType] which is intentionally kept empty to avoid
double-processing with _once() batch mode.
Note: Some pre-existing observers may have broken __init__ chains
(due to ObserverBase.__init_subclass__ wrapped_init not calling
super().__init__). Their next() calls are wrapped in try/except
to prevent one broken observer from crashing the entire run.
Args:
minperstatus: Current minimum period status
once: If True, running in runonce mode; otherwise running in next() mode
"""
# Collect all observers from _lineiterators
observers_to_process = list(self._lineiterators[LineIterator.ObsType])
# Also include TradeLogger observers from stats that are not in _lineiterators
# (TradeLogger needs next() to be called for position/indicator logging)
for obs in self.stats:
if obs not in observers_to_process:
# Only add TradeLogger type observers to avoid breaking other observers
if obs.__class__.__name__ == "TradeLogger":
observers_to_process.append(obs)
# Loop through observers
for observer in observers_to_process:
# For each analyzer in the observer (if observer has _analyzers)
for analyzer in getattr(observer, "_analyzers", []):
# Route to appropriate analyzer method based on minperstatus
if minperstatus < 0:
analyzer._next()
elif minperstatus == 0:
analyzer._nextstart() # only called for the 1st value
else:
analyzer._prenext()
# If running in once mode
if once:
# If current data length > observer length
if len(self) > len(observer):
# If using old data sync method, call advance, else call forward
if self._oldsync:
observer.advance()
else:
observer.forward()
# Route to appropriate observer method based on minperstatus
if minperstatus < 0:
observer.next()
elif minperstatus == 0:
observer.nextstart() # only called for the 1st value
elif len(observer):
observer.prenext()
# If not in once mode, call _next()
else:
observer._next()
def _next_analyzers(self, minperstatus, once=False):
"""Update analyzers based on minimum period status.
Args:
minperstatus: Current minimum period status
once: If True, running in runonce mode (unused but kept for consistency)
"""
for analyzer in self.analyzers:
if minperstatus < 0:
analyzer._next()
elif minperstatus == 0:
analyzer._nextstart() # only called for the 1st value
else:
analyzer._prenext()
def _settz(self, tz):
"""Set timezone for strategy's datetime line.
Args:
tz: Timezone to set
"""
self.lines.datetime._settz(tz)
def _start(self):
"""Initialize strategy and start execution.
Calculates minimum periods, starts analyzers and observers,
and calls user's start() method.
"""
# Calculate and set required minimum period
self._periodset()
# Start analyzers
for analyzer in itertools.chain(self.analyzers, self._slave_analyzers):
analyzer._start()
# Start observers
for obs in self.observers:
if not isinstance(obs, list):
obs = [obs] # support of multi-data observers
for o in obs:
o._start()
# Change operators to stage 2
self._stage2()
# Current length of each data
self._dlens = [len(data) for data in self.datas]
# Current minimum period status defaults to MAXINT (start in prenext)
self._minperstatus = MAXINT
# Call user's start()
self.start()
[docs]
def start(self):
"""Called right before the backtesting is about to be started.
This is a hook for strategies to perform initialization before
the backtesting loop begins.
"""
[docs]
def getwritervalues(self):
"""Get current values for writer output.
Returns:
list: Current values from indicators and observers
"""
values = []
# Loop through indicators/observers
for iocsv in self.indobscsv:
name = iocsv.plotinfo.plotname or iocsv.__class__.__name__
values.append(name)
lio = len(iocsv)
values.append(lio)
# If length > 0, get each value
if lio:
values.extend(map(lambda line: line[0], iocsv.lines.itersize()))
else:
values.extend([""] * iocsv.lines.size())
return values
[docs]
def getwriterinfo(self):
"""Get comprehensive writer information including params and analysis.
Returns:
AutoOrderedDict: Nested structure containing params, indicators,
observers, and analyzer results
"""
# Initialize writer info as AutoOrderedDict
wrinfo = AutoOrderedDict()
# Set parameters
wrinfo["Params"] = self.p._getkwargs()
sections = [["Indicators", self.getindicators_lines()], ["Observers", self.getobservers()]]
# Loop through indicators and observers
for sectname, sectitems in sections:
# Set specific values
sinfo = wrinfo[sectname]
for item in sectitems:
itname = item.__class__.__name__
sinfo[itname].Lines = item.lines.getlinealiases() or None
sinfo[itname].Params = item.p._getkwargs() or None
# Set analyzer values
ainfo = wrinfo.Analyzers
# Internal Value Analyzer
ainfo.Value.Begin = self.broker.startingcash
ainfo.Value.End = self.broker.getvalue()
# No slave analyzers for a writer
for aname, analyzer in self.analyzers.getitems():
ainfo[aname].Params = analyzer.p._getkwargs() or None
ainfo[aname].Analysis = analyzer.get_analysis()
return wrinfo
def _stop(self):
# CRITICAL FIX: In runonce mode, ensure indicator lencount matches strategy length
# This must be done BEFORE calling user's stop() method, as tests check len(indicator) == len(strategy)
try:
strategy_len = len(self)
strategy_clock = getattr(self, "_clock", None)
# Update lencount for all indicators to match strategy length
# This is critical for runonce mode where indicators are pre-calculated but lencount may not match
if hasattr(self, "_lineiterators"):
from .lineiterator import LineIterator
for indicator in self._lineiterators.get(LineIterator.IndType, []):
if getattr(indicator, "_clock", None) is not strategy_clock:
continue
# Update lencount for all lines in the indicator to match strategy length
if hasattr(indicator, "lines") and hasattr(indicator.lines, "lines"):
for line in indicator.lines.lines:
if hasattr(line, "lencount"):
# In runonce mode, set lencount to match strategy length (which equals data length)
# This ensures len(indicator) == len(strategy) for test assertions
line.lencount = strategy_len
except Exception:
logger.debug("Failed to update indicator lencount in _stop", exc_info=True)
# CRITICAL FIX: Restore last valid datetime before calling user's stop()
# This ensures datetime[0] is valid for logging in stop() method
if hasattr(self, "_last_valid_datetime") and self._last_valid_datetime > 0:
try:
# Restore strategy datetime
self.lines.datetime[0] = self._last_valid_datetime
# CRITICAL: Also restore all data feed datetimes
for data in self.datas:
try:
data.datetime[0] = self._last_valid_datetime
except Exception:
logger.debug(
"Failed to restore datetime for data %s in _stop",
getattr(data, "_name", data),
)
except Exception:
logger.debug("Failed to restore strategy datetime in _stop", exc_info=True)
# Call user's stop() method - can be overridden in strategy subclass
self.stop()
# Stop analyzers (both user-added and slave analyzers for observers)
for analyzer in itertools.chain(self.analyzers, self._slave_analyzers):
analyzer._stop()
# Stop observers (flush logs, etc.)
for observer in self._get_all_observers():
try:
if hasattr(observer, "stop"):
observer.stop()
except Exception:
logger.warning(
"Observer %s.stop() raised an exception",
type(observer).__name__,
exc_info=True,
)
# Change operators back to stage 1 - allows reuse of datas
self._stage1()
[docs]
def stop(self):
"""Called right before the backtesting is about to be stopped.
This is a hook for strategies to perform cleanup or final logging.
"""
[docs]
def set_tradehistory(self, onoff=True):
"""Enable or disable trade history tracking.
Args:
onoff: If True, keep full trade history; if False, only track current trade
"""
self._tradehistoryon = onoff
[docs]
def clear(self):
"""Clear pending orders and trades.
Moves pending orders to _orders list and clears pending trades.
"""
self._orders.extend(self._orderspending)
self._orderspending = []
self._tradespending = []
def _addnotification(self, order, quicknotify=False):
"""Add order notification and process trade updates.
Args:
order: The order that has been updated
quicknotify: If True, immediately process notification without queueing
"""
# If not simulated trading, add order to pending orders
if not order.p.simulated:
self._orderspending.append(order)
# If in quick notify mode, initialize qorders and qtrades
if quicknotify:
qorders = [order]
qtrades: list = []
# If order has no executed volume
if not order.executed.size:
# If in quick notify mode, call _notify with info
if quicknotify:
self._notify(qorders=qorders, qtrades=qtrades)
return
# Get trade data - if order.data._compensate is None, use order.data; otherwise use order.data._compensate
tradedata = getattr(order.data, "_compensate", None)
if tradedata is None:
tradedata = order.data
# Get trade data - if trade exists in _trades, use the last one; otherwise create a new trade and save to datatrades
tradekey = trade_key_from_order(order)
datatrades = self._trades[tradedata][tradekey]
if not datatrades:
trade = Trade(data=tradedata, tradeid=tradekey, historyon=self._tradehistoryon)
datatrades.append(trade)
else:
trade = datatrades[-1]
# Loop through order execution bits
for exbit in order.executed.iterpending():
# If execution bit is None, break loop
if exbit is None:
break
# If execution bit indicates closed position
if exbit.closed:
# Update trade
trade.update(
order,
exbit.closed,
exbit.price,
exbit.closedvalue,
exbit.closedcomm,
exbit.pnl,
comminfo=order.comminfo,
)
# If trade is closed
if trade.isclosed:
# Copy trade and add to _tradespending
self._tradespending.append(copy.copy(trade))
# If quick notify needed, copy trade and add to qtrades
if quicknotify:
qtrades.append(copy.copy(trade))
# Update it if needed
# If order execution bit indicates opened position
if exbit.opened:
# If trade is closed, create new trade and save to datatrades
if trade.isclosed:
trade = Trade(data=tradedata, tradeid=tradekey, historyon=self._tradehistoryon)
datatrades.append(trade)
# Update trade
trade.update(
order,
exbit.opened,
exbit.price,
exbit.openedvalue,
exbit.openedcomm,
exbit.pnl,
comminfo=order.comminfo,
)
# This extra check covers the case in which different tradeid
# orders have put the position down to 0 and the next order
# "opens" a position but "closes" the trade
# If trade is closed
if trade.isclosed:
# Copy trade and add to _tradespending
self._tradespending.append(copy.copy(trade))
# If quick notify needed, copy trade and add to qtrades
if quicknotify:
qtrades.append(copy.copy(trade))
# If trade was just opened
if trade.justopened:
# Copy trade and add to _tradespending
self._tradespending.append(copy.copy(trade))
# If quick notify needed, copy trade and add to qtrades
if quicknotify:
qtrades.append(copy.copy(trade))
# If quick notify needed, call _notify
if quicknotify:
self._notify(qorders=qorders, qtrades=qtrades)
def _notify(self, qorders=None, qtrades=None):
"""Notify order and trade events to strategy and analyzers.
Args:
qorders: Quick notify orders (empty list if not in quick notify mode)
qtrades: Quick notify trades (empty list if not in quick notify mode)
"""
if qorders is None:
qorders = []
if qtrades is None:
qtrades = []
# If quick notify is enabled
if self.cerebro.p.quicknotify:
# Need to know if quicknotify is on, to not reprocess pendingorders
# and pendingtrades, which have to exist for things like observers
# which look into it
# Pending orders and trades are qorders and qtrades
procorders = qorders
proctrades = qtrades
# Otherwise use orders and trades saved in _orderspending and _tradespending
else:
procorders = self._orderspending
proctrades = self._tradespending
# PERFORMANCE OPTIMIZATION: Cache merged analyzer list to avoid repeated itertools.chain
# This is called 688K+ times, so caching makes a significant difference
all_analyzers = getattr(self, "_all_analyzers_cache", None)
if all_analyzers is None:
all_analyzers = list(self.analyzers) + list(self._slave_analyzers)
self._all_analyzers_cache = all_analyzers
# Loop through pending orders
for order in procorders:
# If order execution type is not Historical or histnotify, notify order
if order.exectype != order.Historical or order.histnotify:
self.notify_order(order)
# Notify order to analyzers (both user and slave analyzers)
for analyzer in all_analyzers:
analyzer._notify_order(order)
# Notify order to observers (e.g., TradeLogger)
self._notify_order_to_observers(order)
# Loop through pending trades, notify, and notify analyzers
for trade in proctrades:
self.notify_trade(trade)
for analyzer in all_analyzers:
analyzer._notify_trade(trade)
# Notify trade to observers (e.g., TradeLogger)
self._notify_trade_to_observers(trade)
# If qorders is not empty, return after processing orders
if qorders:
return # cash is notified regularly
# Get cash, value, fundvalue, fundshares
cash = self.broker.getcash()
value = self.broker.getvalue()
fundvalue = self.broker.fundvalue
fundshares = self.broker.fundshares
# Notify cash and value values, and notify analyzers
self.notify_cashvalue(cash, value)
# Notify fund values, and notify analyzers
self.notify_fund(cash, value, fundvalue, fundshares)
for analyzer in all_analyzers:
analyzer._notify_cashvalue(cash, value)
analyzer._notify_fund(cash, value, fundvalue, fundshares)
[docs]
def add_timer(
self,
when,
offset=datetime.timedelta(),
repeat=datetime.timedelta(),
weekdays=None,
weekcarry=False,
monthdays=None,
monthcarry=True,
allow=None,
tzdata=None,
cheat=False,
*args,
**kwargs,
):
"""Schedule a timer to invoke notify_timer or a callback.
Note: Can be called during __init__ or start
Schedules a timer to invoke either a specified callback or the
notify_timer of one or more strategies.
Args:
when: Can be:
- datetime.time instance (see tzdata below)
- bt.timer.SESSION_START to reference session start
- bt.timer.SESSION_END to reference session end
offset (datetime.timedelta): Offset the when value. Used with
SESSION_START/SESSION_END to trigger after session start/end.
repeat (datetime.timedelta): If set, timer repeats at this interval
within the same session. Resets to original when after session end.
weekdays (list): Sorted iterable with integers (Monday=1, Sunday=7)
indicating which days the timer can be invoked. Empty = all days.
weekcarry (bool): If True and weekday not seen (e.g., holiday),
execute on next day (even if in new week).
monthdays (list): Sorted iterable with integers (1-31) indicating
which days of month to execute. Empty = all days.
monthcarry (bool): If True and day not seen (weekend, holiday),
execute on next available day.
allow (callable): Callback receiving datetime.date, returns True if
date is allowed for timer execution.
tzdata: Timezone data - None, pytz instance, or data feed instance.
If None and when is SESSION_START/END, uses first data feed.
cheat (bool): If True, timer called before broker evaluates orders,
allowing orders based on opening price.
*args: Additional args passed to notify_timer
**kwargs: Additional kwargs passed to notify_timer
Returns:
The created timer instance
"""
return self.cerebro._add_timer(
owner=self,
when=when,
offset=offset,
repeat=repeat,
weekdays=weekdays,
weekcarry=weekcarry,
monthdays=monthdays,
monthcarry=monthcarry,
allow=allow,
tzdata=tzdata,
strats=False,
cheat=cheat,
*args,
**kwargs,
)
[docs]
def notify_timer(self, timer, when, *args, **kwargs):
"""Receive timer notifications.
Receives a timer notification where ``timer`` is the timer instance
returned by ``add_timer``, and ``when`` is the calling time. ``args``
and ``kwargs`` are any additional arguments passed to ``add_timer``.
The actual ``when`` time can be later than expected, as the system may
not have been able to call the timer before. This value is the timer's
scheduled time, not the actual system time.
Args:
timer: The timer instance created by add_timer
when: The scheduled time when the timer was triggered
*args: Additional positional arguments passed to add_timer
**kwargs: Additional keyword arguments passed to add_timer
"""
[docs]
def notify_cashvalue(self, cash, value):
"""Notify the current cash and value of the strategy's broker.
Args:
cash: Current cash amount
value: Current portfolio value
"""
[docs]
def notify_fund(self, cash, value, fundvalue, shares):
"""Notify the current cash, value, fund value, and fund shares.
Args:
cash: Current cash amount
value: Current portfolio value
fundvalue: Current fund value
shares: Current fund shares
"""
[docs]
def notify_order(self, order):
"""Receive notification when an order status changes.
Args:
order: The order with changed status
"""
[docs]
def notify_trade(self, trade):
"""Receive notification when a trade status changes.
Args:
trade: The trade with changed status
"""
[docs]
def notify_store(self, msg, *args, **kwargs):
"""Receive notification from a store provider.
Args:
msg: Message from the store
*args: Additional positional arguments
**kwargs: Additional keyword arguments
"""
[docs]
def notify_data(self, data, status, *args, **kwargs):
"""Receive notification from a data feed.
Args:
data: The data feed sending the notification
status: Status code
*args: Additional positional arguments
**kwargs: Additional keyword arguments
"""
# ========== Tick/Channel Event Callbacks ==========
[docs]
def notify_tick(self, tick):
"""Called when a new tick event arrives.
Override this method to implement tick-level trading logic.
Args:
tick: TickEvent instance with price, volume, direction, etc.
"""
[docs]
def notify_orderbook(self, orderbook):
"""Called when a new order book snapshot arrives.
Override this method to implement orderbook-based trading logic.
Args:
orderbook: OrderBookSnapshot instance with bids, asks, spread, etc.
"""
[docs]
def notify_funding(self, funding):
"""Called when a new funding rate event arrives.
Override this method to implement funding rate arbitrage or
position management.
Args:
funding: FundingEvent instance with rate, mark_price, etc.
"""
[docs]
def notify_bar(self, bar):
"""Called when a bar event arrives from the channel system.
This is different from the standard next() method which processes
bars from LineSeries data feeds. This callback handles BarEvents
from the channel/queue system.
Args:
bar: BarEvent instance with open, high, low, close, volume.
"""
[docs]
def get_last_tick(self, symbol=None):
"""Get the last tick for a symbol.
Args:
symbol: Symbol name. If None, returns first available.
Returns:
TickEvent or None.
"""
if symbol:
return self._last_tick.get(symbol)
if self._last_tick:
return next(iter(self._last_tick.values()))
return None
[docs]
def get_last_orderbook(self, symbol=None):
"""Get the last order book for a symbol.
Args:
symbol: Symbol name. If None, returns first available.
Returns:
OrderBookSnapshot or None.
"""
if symbol:
return self._last_ob.get(symbol)
if self._last_ob:
return next(iter(self._last_ob.values()))
return None
[docs]
def get_last_funding(self, symbol=None):
"""Get the last funding rate for a symbol.
Args:
symbol: Symbol name. If None, returns first available.
Returns:
FundingEvent or None.
"""
if symbol:
return self._last_funding.get(symbol)
if self._last_funding:
return next(iter(self._last_funding.values()))
return None
# ========== Data Access Methods ==========
[docs]
def getdatanames(self):
"""Get a list of all data names in the system.
Returns:
list: Names of all data feeds
"""
return keys(self.env.datasbyname)
[docs]
def getdatabyname(self, name):
"""Get a data feed by its name.
Args:
name: Name of the data feed
Returns:
The data feed with the given name
"""
return self.env.datasbyname[name]
[docs]
def cancel(self, order):
"""Cancel an order in the broker.
Args:
order: The order to cancel
"""
self.broker.cancel(order)
[docs]
def buy(
self,
data=None,
size=None,
price=None,
plimit=None,
exectype=None,
valid=None,
tradeid=0,
oco=None,
trailamount=None,
trailpercent=None,
parent=None,
transmit=True,
**kwargs,
) -> Optional[Order]:
"""Create a buy (long) order and send it to the broker.
Args:
data: The data feed for the order. If None, uses the first data feed
(self.data).
size: Size to use (positive) for the order. If None, the sizer
instance retrieved via getsizer will determine the size.
price: Price to use. None is valid for Market and Close orders.
For Limit, Stop and StopLimit orders this determines the
trigger point.
plimit: Only applicable to StopLimit orders. This is the price at
which to set the implicit Limit order, once the Stop has been
triggered.
trailamount: For StopTrail/StopTrailLimit orders, an absolute amount
which determines the distance to the price to keep the trailing
stop.
trailpercent: For StopTrail/StopTrailLimit orders, a percentage
amount which determines the distance to the price to keep the
trailing stop.
exectype: Execution type. Possible values:
- Order.Market or None: Market order
- Order.Limit: Limit order
- Order.Stop: Stop order
- Order.StopLimit: Stop-limit order
- Order.Close: Close order
- Order.StopTrail: Stop-trail order
- Order.StopTrailLimit: Stop-trail-limit order
valid: Order validity. Possible values:
- None: Good till cancel
- datetime.datetime/date: Good till date
- Order.DAY: Day order
tradeid: Internal value to track overlapping trades.
oco: Another order instance for OCO (Order Cancel Others) group.
parent: Controls the relationship of a group of orders (e.g., bracket
orders).
transmit: If True, transmit the order to the broker. Used for
controlling bracket orders.
**kwargs: Additional broker-specific parameters.
Returns:
The submitted order, or None if size is 0.
Example:
Create a market buy order:
>>> order = self.buy()
Create a limit buy order:
>>> order = self.buy(price=100.0, exectype=Order.Limit)
"""
# Resolve data argument
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
if self.datas:
data = self.datas[0]
else:
raise ValueError(
"No data feed available. In channel mode, pass a data "
"object explicitly: self.buy(data=my_data, size=...)"
)
# Use the provided size, otherwise calculate via getsizer
size = size if size is not None else self.getsizing(data, isbuy=True)
# If size is non-zero, submit the order
if size:
order = self.broker.buy(
self,
data,
size=abs(size),
price=price,
plimit=plimit,
exectype=exectype,
valid=valid,
tradeid=tradeid,
oco=oco,
trailamount=trailamount,
trailpercent=trailpercent,
parent=parent,
transmit=transmit,
**kwargs,
)
# Auto-notify signal to TradeLogger observers
signal_price = (
price
if price is not None
else (
data.close[0]
if hasattr(data, "close") and hasattr(data.close, "__getitem__")
else 0
)
)
self._notify_signal_to_observers("buy", abs(size), signal_price, data)
return order
return None
[docs]
def sell(
self,
data=None,
size=None,
price=None,
plimit=None,
exectype=None,
valid=None,
tradeid=0,
oco=None,
trailamount=None,
trailpercent=None,
parent=None,
transmit=True,
**kwargs,
) -> Optional[Order]:
"""Create a sell (short) order and send it to the broker.
See the documentation for ``buy`` for an explanation of the parameters.
Returns:
The submitted order, or None if no order was created
"""
# Resolve data argument
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
if self.datas:
data = self.datas[0]
else:
raise ValueError(
"No data feed available. In channel mode, pass a data "
"object explicitly: self.sell(data=my_data, size=...)"
)
size = size if size is not None else self.getsizing(data, isbuy=False)
if size:
order = self.broker.sell(
self,
data,
size=abs(size),
price=price,
plimit=plimit,
exectype=exectype,
valid=valid,
tradeid=tradeid,
oco=oco,
trailamount=trailamount,
trailpercent=trailpercent,
parent=parent,
transmit=transmit,
**kwargs,
)
# Auto-notify signal to TradeLogger observers
signal_price = (
price
if price is not None
else (
data.close[0]
if hasattr(data, "close") and hasattr(data.close, "__getitem__")
else 0
)
)
self._notify_signal_to_observers("sell", abs(size), signal_price, data)
return order
return None
[docs]
def close(self, data=None, size=None, **kwargs) -> Optional[Order]:
"""Close a long or short position.
Creates an order that counters the existing position to close it.
Args:
data: The data feed for which to close the position.
If None, uses the default data feed.
size: The size to close. If None, closes the entire position.
**kwargs: Additional keyword arguments passed to the order.
Note:
If size is not provided, it is automatically calculated from the
existing position to fully close it.
Returns:
The submitted order, or None if no position exists
"""
# Get the data feed
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
data = self.data
position_side = kwargs.pop("position_side", None)
position_side = normalize_position_side(position_side)
broker_mode = normalize_position_mode(
getattr(self.broker, "get_param", lambda *_args, **_kwargs: "net")(
"position_mode", "net"
)
)
if position_side is not None or broker_mode == POSITION_MODE_DUAL_SIDE:
if position_side is None:
long_size = abs(self.getposition(data, self.broker, side=POSITION_SIDE_LONG).size)
short_size = abs(self.getposition(data, self.broker, side=POSITION_SIDE_SHORT).size)
if long_size and short_size:
raise ValueError(
"close() requires position_side when both long and short legs are open"
)
if long_size:
position_side = POSITION_SIDE_LONG
possize = long_size
elif short_size:
position_side = POSITION_SIDE_SHORT
possize = short_size
else:
return None
else:
possize = abs(self.getposition(data, self.broker, side=position_side).size)
size = abs(size if size is not None else possize)
if not size:
return None
kwargs.setdefault("position_side", position_side)
kwargs.setdefault("offset", POSITION_OFFSET_CLOSE)
if position_side == POSITION_SIDE_LONG:
return self.sell(data=data, size=size, **kwargs)
return self.buy(data=data, size=size, **kwargs)
# Get the current position size
possize = self.getposition(data, self.broker).size
# If size is None, close the entire position; otherwise close the specified size
size = abs(size if size is not None else possize)
# If position is long (positive), sell to close
if possize > 0:
return self.sell(data=data, size=size, **kwargs)
# If position is short (negative), buy to close
if possize < 0:
return self.buy(data=data, size=size, **kwargs)
return None
[docs]
def buy_bracket(
self,
data=None,
size=None,
price=None,
plimit=None,
exectype=Order.Limit,
valid=None,
tradeid=0,
trailamount=None,
trailpercent=None,
oargs=None,
stopprice=None,
stopexec=Order.Stop,
stopargs=None,
limitprice=None,
limitexec=Order.Limit,
limitargs=None,
**kwargs,
):
"""Create a bracket order group (buy order with stop-loss and take-profit).
Creates a bracket order group consisting of:
- A main **buy** order with the specified execution type (default: Limit)
- A *low side* bracket **sell** stop-loss order
- A *high side* bracket **sell** take-profit order
Args:
- ``data`` (default: ``None``): The data feed for the order. If None,
uses the first data feed (self.data).
- ``size`` (default: ``None``): Size for the order. If None, the sizer
determines the size. The same size is applied to all three orders.
- ``price`` (default: ``None``): Price for the main buy order. None
is valid for Market and Close orders.
- ``plimit`` (default: ``None``): Price limit for StopLimit orders.
- ``trailamount`` (default: ``None``): Absolute trailing amount for
StopTrail/StopTrailLimit orders.
- ``trailpercent`` (default: ``None``): Percentage trailing amount for
StopTrail/StopTrailLimit orders.
- ``exectype`` (default: ``bt.Order.Limit``): Execution type for the
main order. See buy() documentation for possible values.
- ``valid`` (default: ``None``): Order validity period. See buy()
documentation for possible values.
- ``tradeid`` (default: ``0``): Trade ID for tracking overlapping trades.
- ``oargs`` (default: ``{}``): Specific keyword arguments (dict) for
the main side order. Applied before **kwargs.
- ``**kwargs``: Additional keyword arguments applied to all three
orders. See buy() documentation for possible values.
- ``stopprice`` (default: ``None``): Specific price for the stop-loss
order.
- ``stopexec`` (default: ``bt.Order.Stop``): Execution type for the
stop-loss order.
- ``stopargs`` (default: ``{}``): Specific keyword arguments (dict)
for the stop-loss order.
- ``limitprice`` (default: ``None``): Specific price for the take-profit
order.
- ``limitexec`` (default: ``bt.Order.Limit``): Execution type for the
take-profit order.
- ``limitargs`` (default: ``{}``): Specific keyword arguments (dict)
for the take-profit order.
Returns:
A list containing the three orders [main_order, stop_order, limit_order].
Suppressed orders are represented as None.
Note:
High/Low side orders can be suppressed by setting limitexec=None or
stopexec=None.
"""
# Normalize mutable-default placeholders (B006); these dicts are only
# read via kargs.update(...), never mutated, so None==empty is equivalent.
oargs = {} if oargs is None else oargs
stopargs = {} if stopargs is None else stopargs
limitargs = {} if limitargs is None else limitargs
# Build parameter dictionary
kargs = {
"size": size,
"data": data,
"price": price,
"plimit": plimit,
"exectype": exectype,
"valid": valid,
"tradeid": tradeid,
"trailamount": trailamount,
"trailpercent": trailpercent,
}
# Update with main side order specific arguments
kargs.update(oargs)
# Update with general keyword arguments
kargs.update(kwargs)
# Set transmit flag: only transmit if both stop and limit are None
kargs["transmit"] = limitexec is None and stopexec is None
# Create the main buy order
o = self.buy(**kargs)
# Create stop-loss order
if stopexec is not None:
# low side / stop
kargs = {
"data": data,
"price": stopprice,
"exectype": stopexec,
"valid": valid,
"tradeid": tradeid,
}
kargs.update(stopargs)
kargs.update(kwargs)
kargs["parent"] = o
kargs["transmit"] = limitexec is None
kargs["size"] = o.size
ostop = self.sell(**kargs)
else:
ostop = None
# Create take-profit order
if limitexec is not None:
# high side / limit
kargs = {
"data": data,
"price": limitprice,
"exectype": limitexec,
"valid": valid,
"tradeid": tradeid,
}
kargs.update(limitargs)
kargs.update(kwargs)
kargs["parent"] = o
kargs["transmit"] = True
kargs["size"] = o.size
olimit = self.sell(**kargs)
else:
olimit = None
return [o, ostop, olimit]
[docs]
def sell_bracket(
self,
data=None,
size=None,
price=None,
plimit=None,
exectype=Order.Limit,
valid=None,
tradeid=0,
trailamount=None,
trailpercent=None,
oargs=None,
stopprice=None,
stopexec=Order.Stop,
stopargs=None,
limitprice=None,
limitexec=Order.Limit,
limitargs=None,
**kwargs,
):
"""Create a sell bracket order group (sell order with stop-loss and take-profit).
Creates a bracket order group consisting of:
- A main **sell** order with the specified execution type (default: Limit)
- A *high side* bracket **buy** stop-loss order
- A *low side* bracket **buy** take-profit order
Args:
See buy_bracket() for parameter documentation.
Returns:
A list containing the three orders [main_order, stop_order, limit_order].
Suppressed orders are represented as None.
Note:
High/Low side orders can be suppressed by setting limitexec=None or
stopexec=None.
"""
# Normalize mutable-default placeholders (B006); read-only via update().
oargs = {} if oargs is None else oargs
stopargs = {} if stopargs is None else stopargs
limitargs = {} if limitargs is None else limitargs
kargs = {
"size": size,
"data": data,
"price": price,
"plimit": plimit,
"exectype": exectype,
"valid": valid,
"tradeid": tradeid,
"trailamount": trailamount,
"trailpercent": trailpercent,
}
kargs.update(oargs)
kargs.update(kwargs)
kargs["transmit"] = limitexec is None and stopexec is None
o = self.sell(**kargs)
if stopexec is not None:
# high side / stop
kargs = {
"data": data,
"price": stopprice,
"exectype": stopexec,
"valid": valid,
"tradeid": tradeid,
}
kargs.update(stopargs)
kargs.update(kwargs)
kargs["parent"] = o
kargs["transmit"] = limitexec is None # transmit if last
kargs["size"] = o.size
ostop = self.buy(**kargs)
else:
ostop = None
if limitexec is not None:
# low side / limit
kargs = {
"data": data,
"price": limitprice,
"exectype": limitexec,
"valid": valid,
"tradeid": tradeid,
}
kargs.update(limitargs)
kargs.update(kwargs)
kargs["parent"] = o
kargs["transmit"] = True
kargs["size"] = o.size
olimit = self.buy(**kargs)
else:
olimit = None
return [o, ostop, olimit]
[docs]
def order_target_size(self, data=None, target=0, **kwargs) -> Optional[Order]:
"""Place an order to achieve a target position size.
Rebalances the current position to reach the specified target size.
Args:
data: The data feed for the order. If None, uses the default data feed.
target: Target position size.
- If target > pos.size: buy (target - pos.size)
- If target < pos.size: sell (pos.size - target)
- If target == 0: close the entire position
**kwargs: Additional keyword arguments passed to buy/sell.
Returns:
The generated order, or None if target == current position size.
"""
# Get the specific data feed
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
data = self.data
# Get the current position size
possize = self.getposition(data, self.broker).size
# If target is 0 and position exists, close the position
if not target and possize:
return self.close(data=data, size=possize, **kwargs)
# If target is greater than current position, buy to increase
if target > possize:
return self.buy(data=data, size=target - possize, **kwargs)
# If target is less than current position, sell to decrease
if target < possize:
return self.sell(data=data, size=possize - target, **kwargs)
return None # no execution target == possize
[docs]
def order_target_value(self, data=None, target=0.0, price=None, **kwargs) -> Optional[Order]:
"""Place an order to achieve a target position value.
Rebalances the position to reach the specified target value.
Args:
data: The data feed for the order. If None, uses the default data feed.
target: Target position value in currency units.
- If target is 0: close position
- If target > value: buy to increase value
- If target < value: sell to decrease value
price: Price for size calculation. If None, uses data.close[0].
**kwargs: Additional keyword arguments passed to buy/sell.
Returns:
The generated order, or None if no order was issued.
"""
# Get the data feed
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
data = self.data
# Get the current position size
possize = self.getposition(data, self.broker).size
# If target is 0 and position exists, close the position
if not target and possize: # closing a position
return self.close(data=data, size=possize, price=price, **kwargs)
# Otherwise, rebalance to target value
# Get the current value of this data
value = self.broker.getvalue(datas=[data])
# Get commission info for size calculation
comminfo = self.broker.getcommissioninfo(data)
# Get price: use provided price or default to close price
# Make sure a price is there
price = price if price is not None else data.close[0]
# If target value is greater than current value, buy
if target > value:
size = comminfo.getsize(price, target - value)
return self.buy(data=data, size=size, price=price, **kwargs)
# If target value is less than current value, sell
if target < value:
size = comminfo.getsize(price, value - target)
return self.sell(data=data, size=size, price=price, **kwargs)
return None # no execution size == possize
[docs]
def order_target_percent(self, data=None, target=0.0, **kwargs) -> Optional[Order]:
"""Place an order to achieve a target percentage of portfolio value.
Rebalances the position so its value equals the target percentage
of the total portfolio value.
Args:
data: The data feed for the order. If None, uses the default data feed.
target: Target percentage as a decimal (e.g., 0.05 for 5%).
**kwargs: Additional keyword arguments passed to order_target_value.
Returns:
The generated order, or None if no order was issued.
Example:
With target=0.05 and portfolio value of 100:
- Target value = 0.05 * 100 = 5
- Orders are placed through order_target_value
Note:
Position direction (long/short) is considered:
- If target > value: buy if pos.size >= 0, sell if pos.size < 0
- If target < value: sell if pos.size >= 0, buy if pos.size < 0
"""
# Get the data feed
if isinstance(data, string_types):
data = self.getdatabyname(data)
elif data is None:
data = self.data
# Calculate target value based on portfolio value
# Note: Getting position size here is not necessary
# possize = self.getposition(data, self.broker).size
target *= self.broker.getvalue()
return self.order_target_value(data=data, target=target, **kwargs)
[docs]
def getposition(self, data=None, broker=None, side=None, **kwargs):
"""Get the current position for a data feed.
Args:
data: The data feed to get position for. If None, uses the first data feed.
broker: The broker to query. If None, uses the default broker.
Returns:
The current Position object.
Note:
A property ``position`` is also available as a shortcut.
"""
data = data if data is not None else self.datas[0]
broker = broker or self.broker
return broker.getposition(data, side=side, **kwargs)
# Property to access position for the default data feed
position = property(getposition)
[docs]
def getpositionbyname(self, name=None, broker=None, side=None, **kwargs):
"""Get the current position for a data feed by name.
Args:
name: Name of the data feed. If None, uses the first data feed.
broker: The broker to query. If None, uses the default broker.
Returns:
The current Position object.
Note:
A property ``positionbyname`` is also available as a shortcut.
"""
data = self.datas[0] if not name else self.getdatabyname(name)
broker = broker or self.broker
return broker.getposition(data, side=side, **kwargs)
# Property to access position by name
positionbyname = property(getpositionbyname)
[docs]
def getpositions(self, broker=None):
"""Get all positions from the broker.
Args:
broker: The broker to query. If None, uses the default broker.
Returns:
Dictionary mapping data feeds to Position objects.
Note:
A property ``positions`` is also available as a shortcut.
"""
broker = broker or self.broker
return broker.positions
# Property to access all positions
positions = property(getpositions)
[docs]
def getpositionsbyname(self, broker=None):
"""Get all positions from the broker indexed by data name.
Args:
broker: The broker to query. If None, uses the default broker.
Returns:
OrderedDict mapping data names to Position objects.
Note:
A property ``positionsbyname`` is also available as a shortcut.
"""
broker = broker or self.broker
positions = broker.positions
posbyname = collections.OrderedDict()
for name, data in iteritems(self.env.datasbyname):
posbyname[name] = positions[data]
return posbyname
# Property to access positions by name
positionsbyname = property(getpositionsbyname)
def _addsizer(self, sizer, *args, **kwargs):
"""Add a sizer to the strategy.
If sizer is None, uses FixedSize sizer. Otherwise instantiates
the provided sizer class and sets it.
Args:
sizer: Sizer class or None
*args: Positional arguments for sizer instantiation
**kwargs: Keyword arguments for sizer instantiation
"""
if sizer is None:
self.setsizer(FixedSize())
else:
self.setsizer(sizer(*args, **kwargs))
[docs]
def setsizer(self, sizer):
"""Set the sizer for automatic stake calculation.
Args:
sizer: The sizer instance to use
Returns:
The sizer instance
"""
self._sizer = sizer
sizer.set(self, self.broker)
return sizer
[docs]
def getsizer(self):
"""Get the current sizer for automatic stake calculation.
Returns:
The current sizer instance
Note:
Also available as the ``sizer`` property.
"""
return self._sizer
sizer = property(getsizer, setsizer)
[docs]
def getsizing(self, data=None, isbuy=True):
"""Get the order size from the sizer.
Uses the configured sizer to calculate the appropriate stake size
for the next order.
Args:
data: The data feed for the order. If None, uses the default data.
isbuy: True for buy orders, False for sell orders.
Returns:
The calculated stake size.
"""
# Ensure sizer has broker reference
if hasattr(self._sizer, "broker") and self._sizer.broker is None:
self._sizer.set(self, self.broker)
return self._sizer.getsizing(data, isbuy)
[docs]
class SignalStrategy(Strategy):
"""A strategy subclass that automatically operates using signals.
This strategy subclass responds to signal indicators to automatically
enter and exit positions based on signal values.
Signal values:
- ``> 0`` indicates a long (buy) signal
- ``< 0`` indicates a short (sell) signal
There are five types of signals, broken into two groups:
**Main Group**:
- ``LONGSHORT``: Both long and short indications from this signal
are taken. The strategy will go long or short based on the sign.
- ``LONG``:
- Positive (long) indications: Go long
- Negative (short) indications: Close long position
- If ``LONGEXIT`` exists, it is used to exit longs
- If ``SHORT`` signal exists and no ``LONGEXIT``, it will close
longs before opening a short
- ``SHORT``:
- Negative (short) indications: Go short
- Positive (long) indications: Close short position
- If ``SHORTEXIT`` exists, it is used to exit shorts
- If ``LONG`` signal exists and no ``SHORTEXIT``, it will close
shorts before opening a long
**Exit Group**:
These signals override others to provide explicit exit criteria:
- ``LONGEXIT``: Negative indications are taken to exit long positions
- ``SHORTEXIT``: Positive indications are taken to exit short positions
**Order Issuing**
Orders are placed with Market execution type and Good-Until-Canceled
validity.
Params:
- ``signals`` (default: ``[]``): A list/tuple of lists/tuples for signal
instantiation and type allocation. This parameter is typically managed
through ``cerebro.add_signal``.
- ``_accumulate`` (default: ``False``): Allow entering the market even if
already in a position (accumulate positions).
- ``_concurrent`` (default: ``False``): Allow issuing orders even when
orders are already pending execution.
- ``_data`` (default: ``None``): If multiple datas are present in the
system which is the target for orders. This can be
- ``None``: The first data in the system will be used
- An ``int``: indicating the data that was inserted at that position
- An ``str``: name given to the data when creating it (parameter
``name``) or when adding it cerebro with ``cerebro.adddata(...,
name=)``
- A ``data`` instance
"""
# Parameters for signal strategy
params: tuple = (
("signals", []),
("_accumulate", False),
("_concurrent", False),
("_data", None),
)
[docs]
def __new__(cls, *args, **kwargs):
"""Override __new__ to handle next method remapping that was done in MetaSigStrategy"""
# Handle next method remapping like the old MetaSigStrategy.__new__ did
if hasattr(cls, "next") and not hasattr(cls, "_next_custom"):
cls._next_custom = cls.next
# Create the instance
instance = super().__new__(cls, *args, **kwargs)
# Set the next method to _next_catch (from MetaSigStrategy)
instance.next = instance._next_catch
return instance
def __init__(self, *args, **kwargs):
"""Initialize the signal strategy with functionality from MetaSigStrategy methods"""
# Handle the functionality that was in MetaSigStrategy.dopreinit
self._signals = collections.defaultdict(list)
# Set the data target (from MetaSigStrategy.dopreinit)
_data = getattr(self.p, "_data", None)
if _data is None:
self._dtarget = self.data0
elif isinstance(_data, integer_types):
self._dtarget = self.datas[_data]
elif isinstance(_data, string_types):
self._dtarget = self.getdatabyname(_data)
elif isinstance(_data, LineRoot):
self._dtarget = _data
else:
self._dtarget = self.data0
# Filter out strategy parameter kwargs to prevent them from reaching parent __init__
filtered_kwargs = kwargs.copy()
if hasattr(self.__class__, "_params") and self.__class__._params is not None:
params_cls = self.__class__._params
param_names = set()
# Get all parameter names from the class
if hasattr(params_cls, "_getpairs"):
param_names.update(params_cls._getpairs().keys())
elif hasattr(params_cls, "_gettuple"):
param_names.update(key for key, value in params_cls._gettuple())
# Remove strategy parameter kwargs
filtered_kwargs = {k: v for k, v in kwargs.items() if k not in param_names}
# Call parent initialization with filtered kwargs
# Don't pass *args to avoid object.__init__() error, consistent with Strategy.__init__ fix
if filtered_kwargs:
super().__init__(**filtered_kwargs)
else:
super().__init__()
# Handle the functionality that was in MetaSigStrategy.dopostinit
# Add signals from params
# CRITICAL FIX: Pass self._dtarget as data source for signal indicators
# and register them with the strategy so they get processed
for sigtype, sigcls, sigargs, sigkwargs in self.p.signals:
sig_indicator = sigcls(self._dtarget, *sigargs, **sigkwargs)
self._signals[sigtype].append(sig_indicator)
# CRITICAL FIX: Register signal indicator with strategy's _lineiterators
# so its once()/next() methods get called during processing
if hasattr(sig_indicator, "_ltype"):
ltype = sig_indicator._ltype
if sig_indicator not in self._lineiterators[ltype]:
self._lineiterators[ltype].append(sig_indicator)
sig_indicator._owner = self
# Record types of signals
self._longshort = bool(self._signals[SIGNAL_LONGSHORT])
self._long = bool(self._signals[SIGNAL_LONG])
self._short = bool(self._signals[SIGNAL_SHORT])
self._longexit = bool(self._signals[SIGNAL_LONGEXIT])
self._shortexit = bool(self._signals[SIGNAL_SHORTEXIT])
def _start(self):
"""Start the signal strategy and initialize the order sentinel."""
self._sentinel = None # sentinel for order concurrency
super()._start()
[docs]
def signal_add(self, sigtype, signal):
"""Add a signal indicator to the strategy.
Args:
sigtype: Type of signal (e.g., SIGNAL_LONG, SIGNAL_SHORT)
signal: The signal indicator instance
"""
self._signals[sigtype].append(signal)
def _notify(self, qorders=None, qtrades=None):
"""Process notifications and reset sentinel when order completes.
Args:
qorders: Quick notify orders
qtrades: Quick notify trades
"""
if qorders is None:
qorders = []
if qtrades is None:
qtrades = []
# Nullify the sentinel if done
procorders = qorders or self._orderspending
if self._sentinel is not None:
for order in procorders:
if order == self._sentinel and not order.alive():
self._sentinel = None
break
super()._notify(qorders=qorders, qtrades=qtrades)
def _next_catch(self):
"""Catch method that routes to signal processing and custom next."""
self._next_signal()
if hasattr(self, "_next_custom"):
self._next_custom()
@staticmethod
def _all_pos(sig, nosig):
"""True if every value in ``sig`` (or ``nosig`` when empty) is > 0."""
return all(x[0] > 0.0 for x in sig or nosig)
@staticmethod
def _all_neg(sig, nosig):
"""True if every value in ``sig`` (or ``nosig`` when empty) is < 0."""
return all(x[0] < 0.0 for x in sig or nosig)
@staticmethod
def _all_any(sig, nosig):
"""True if every value in ``sig`` (or ``nosig`` when empty) is truthy."""
return all(x[0] for x in sig or nosig)
def _evaluate_signals(self):
"""Evaluate all signal collections into entry/exit/reversal flags.
Pure helper (no order side effects) extracted from _next_signal for
readability. Returns a tuple of booleans consumed by the position
decision logic:
(ls_long, ls_short, l_enter, s_enter, l_exit, s_exit,
l_rev, s_rev, l_leave, s_leave)
"""
# Get signal collections
sigs = self._signals
# Default no-signal value
nosig = [[0.0]]
pos = self._all_pos
neg = self._all_neg
anyv = self._all_any
# Calculate current status of the signals
# If SIGNAL_LONGSHORT is empty, loop through nosig
ls_long = pos(sigs[SIGNAL_LONGSHORT], nosig)
ls_short = neg(sigs[SIGNAL_LONGSHORT], nosig)
# Long entry: direct (>0), inverted (<0) or any (truthy)
l_enter = (
pos(sigs[SIGNAL_LONG], nosig)
or neg(sigs[SIGNAL_LONG_INV], nosig)
or anyv(sigs[SIGNAL_LONG_ANY], nosig)
)
# Short entry: direct (<0), inverted (>0) or any (truthy)
s_enter = (
neg(sigs[SIGNAL_SHORT], nosig)
or pos(sigs[SIGNAL_SHORT_INV], nosig)
or anyv(sigs[SIGNAL_SHORT_ANY], nosig)
)
# Long exit: direct (<0), inverted (>0) or any (truthy)
l_exit = (
neg(sigs[SIGNAL_LONGEXIT], nosig)
or pos(sigs[SIGNAL_LONGEXIT_INV], nosig)
or anyv(sigs[SIGNAL_LONGEXIT_ANY], nosig)
)
# Short exit: direct (>0), inverted (<0) or any (truthy)
s_exit = (
pos(sigs[SIGNAL_SHORTEXIT], nosig)
or neg(sigs[SIGNAL_SHORTEXIT_INV], nosig)
or anyv(sigs[SIGNAL_SHORTEXIT_ANY], nosig)
)
# Use opposite signals to start reversal (by closing)
# but only if no "xxxExit" exists
# Long reversal: no long exit and short entry signal
l_rev = not self._longexit and s_enter
# Short reversal: no short exit and long entry signal
s_rev = not self._shortexit and l_enter
# Opposite of individual long and short (leave = exit on opposite signal)
# Long leave: direct (<0), inverted (>0) or any (truthy)
l_leave = (
neg(sigs[SIGNAL_LONG], nosig)
or pos(sigs[SIGNAL_LONG_INV], nosig)
or anyv(sigs[SIGNAL_LONG_ANY], nosig)
)
# Short leave: direct (>0), inverted (<0) or any (truthy)
s_leave = (
pos(sigs[SIGNAL_SHORT], nosig)
or neg(sigs[SIGNAL_SHORT_INV], nosig)
or anyv(sigs[SIGNAL_SHORT_ANY], nosig)
)
# Invalidate long leave if longexit signals are available
# If longexit exists, disable l_leave; otherwise keep l_leave
l_leave = not self._longexit and l_leave
# Invalidate short leave if shortexit signals are available
# If shortexit exists, disable s_leave; otherwise keep s_leave
s_leave = not self._shortexit and s_leave
return (
ls_long,
ls_short,
l_enter,
s_enter,
l_exit,
s_exit,
l_rev,
s_rev,
l_leave,
s_leave,
)
def _next_signal(self):
"""Process signals and generate orders based on signal values.
Evaluates all signal types and generates buy/sell orders based on:
- Current position status
- Signal values (positive/negative)
- Accumulation and concurrency settings
"""
# If concurrent orders are disabled and an order is active, return
if self._sentinel is not None and not self.p._concurrent:
return # order active and more than 1 not allowed
# Evaluate all signal collections into decision flags
(
ls_long,
ls_short,
l_enter,
s_enter,
l_exit,
s_exit,
l_rev,
s_rev,
l_leave,
s_leave,
) = self._evaluate_signals()
# Take size and start logic
# Get current position size
size = self.getposition(self._dtarget).size
# If no position
if not size:
# Enter new position based on signals
if ls_long or l_enter:
self._sentinel = self.buy(self._dtarget)
elif ls_short or s_enter:
self._sentinel = self.sell(self._dtarget)
# If current position is long (positive)
elif size > 0: # current long position
if ls_short or l_exit or l_rev or l_leave:
# closing position - not relevant for concurrency
self.close(self._dtarget)
if ls_short or l_rev:
self._sentinel = self.sell(self._dtarget)
if ls_long or l_enter:
if self.p._accumulate:
self._sentinel = self.buy(self._dtarget)
# If current position is short (negative)
elif size < 0: # current short position
if ls_long or s_exit or s_rev or s_leave:
# closing position - not relevant for concurrency
self.close(self._dtarget)
if ls_long or s_rev:
self._sentinel = self.buy(self._dtarget)
if ls_short or s_enter:
if self.p._accumulate:
self._sentinel = self.sell(self._dtarget)
[docs]
class BtApiStrategy(Strategy):
"""A Strategy subclass with built-in logging capabilities.
This strategy class extends the base Strategy class with automatic
logger initialization using the SpdLogManager. It provides a default
log() method for logging messages and custom notification handling.
Attributes:
logger: The configured logger instance from SpdLogManager.
Params:
log_file_name: Optional custom log file name. If not provided,
defaults to "{ClassName}.log".
Example:
class MyStrategy(bt.BtApiStrategy):
params = (('log_file_name', 'my_strategy.log'),)
def next(self):
self.log(f'Close price: {self.data.close[0]:.2f}')
"""
def __init__(self):
"""Initialize the strategy with a logger instance."""
self.logger = self.init_logger(self.p.get("log_file_name", None))
[docs]
def init_logger(self, log_file_name=None):
"""Initialize and return a logger instance.
Creates a logger using SpdLogManager with the specified or default
log file name.
Args:
log_file_name: Optional custom log file name. If None, uses
"{ClassName}.log" as the default.
Returns:
A configured logger instance.
"""
if log_file_name is None:
logger = SpdLogManager(
file_name=self.__class__.__name__ + ".log", logger_name="strategy", print_info=True
).create_logger()
else:
logger = SpdLogManager(
file_name=log_file_name, logger_name="strategy", print_info=True
).create_logger()
return logger
[docs]
def log(self, txt):
"""Log a message at INFO level.
Args:
txt: The message text to log.
"""
self.logger.info(txt)
def _addnotification(self, data, quicknotify=True):
"""Process notifications for orders and trades with logging.
This method extends the base notification handling to route
notifications to the appropriate handler methods.
Args:
data: The notification data, which can be an order or trade.
quicknotify: If True, immediately process notification without queueing.
"""
if data.data_type == "order":
self.notify_order(data)
if data.data_type == "trade":
self.notify_trade(data)