Source code for backtrader.strategy

#!/usr/bin/env python
"""Strategy module - Base class for user-defined trading strategies.

This module provides the Strategy class which serves as the foundation for
all user-defined trading strategies in Backtrader. It handles order management,
position tracking, indicator integration, and the event-driven execution model.

Key Features:
    - Order creation and management (buy, sell, close, cancel)
    - Position tracking per data feed
    - Integration with indicators and analyzers
    - Event notifications (order, trade, data, timer)
    - Support for multiple data feeds and timeframes
    - Signal-based trading via SignalStrategy

Example:
    Basic strategy implementation::

        import backtrader as bt

        class MyStrategy(bt.Strategy):
            params = (('period', 20),)

            def __init__(self):
                self.sma = bt.indicators.SMA(period=self.p.period)

            def next(self):
                if self.data.close[0] > self.sma[0]:
                    self.buy()
                elif self.data.close[0] < self.sma[0]:
                    self.sell()

Classes:
    Strategy: Main base class for trading strategies.
    SignalStrategy: Strategy subclass that responds to signal indicators.
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import collections
import copy
import datetime
import itertools
import math
from typing import Optional

from .lineiterator import LineIterator, StrategyBase
from .lineroot import LineRoot, LineSingle
from .lineseries import LineSeriesStub
from .metabase import ItemCollection, OwnerContext, findowner
from .order import Order
from .position_modes import (
    POSITION_MODE_DUAL_SIDE,
    POSITION_OFFSET_CLOSE,
    POSITION_SIDE_LONG,
    POSITION_SIDE_SHORT,
    normalize_position_mode,
    normalize_position_side,
    trade_key_from_order,
)
from .signal import (
    SIGNAL_LONG,
    SIGNAL_LONG_ANY,
    SIGNAL_LONG_INV,
    SIGNAL_LONGEXIT,
    SIGNAL_LONGEXIT_ANY,
    SIGNAL_LONGEXIT_INV,
    SIGNAL_LONGSHORT,
    SIGNAL_SHORT,
    SIGNAL_SHORT_ANY,
    SIGNAL_SHORT_INV,
    SIGNAL_SHORTEXIT,
    SIGNAL_SHORTEXIT_ANY,
    SIGNAL_SHORTEXIT_INV,
)
from .sizers.fixedsize import FixedSize
from .trade import Trade
from .utils import AutoDictList, AutoOrderedDict
from .utils.log_message import SpdLogManager, get_logger
from .utils.py3 import MAXINT, filter, integer_types, iteritems, keys, map, string_types

logger = get_logger(__name__)


[docs] class Strategy(StrategyBase): """Base class for user-defined trading strategies. This class provides the core functionality for implementing trading strategies including order management, position tracking, and event handling. Users should subclass this to create custom strategies. Attributes: env: Reference to the Cerebro environment. cerebro: Alias for env. broker: Reference to the broker for order execution. datas: List of data feeds available to the strategy. data: Shortcut to the first data feed (datas[0]). position: Current position for the main data feed. stats: Collection of observer instances. analyzers: Collection of analyzer instances. Methods to Override: __init__: Initialize indicators and strategy state. start: Called when the strategy starts running. prenext: Called before minimum period is reached. nextstart: Called once when minimum period is first reached. next: Main strategy logic, called on each bar. stop: Called when the strategy stops running. notify_order: Receive order status notifications. notify_trade: Receive trade notifications. notify_data: Receive data feed notifications. notify_timer: Receive timer notifications. Example: class MyStrategy(bt.Strategy): params = (('period', 20),) def __init__(self): self.sma = bt.indicators.SMA(period=self.p.period) def next(self): if not self.position: if self.data.close[0] > self.sma[0]: self.buy() else: if self.data.close[0] < self.sma[0]: self.close() """ # Class-level storage for strategies _indcol: dict = {} @classmethod def _create_strategy_safely(cls, *args, **kwargs): """Safely create a strategy instance with proper parameter filtering. Separates __new__ (parameter setup) from __init__ (data/indicator setup) to ensure parameters are fully processed before __init__ runs. """ # __new__ processes all kwargs into _params_instance instance = cls.__new__(cls, *args, **kwargs) # __init__ handles data setup, clock, and user subclass init. # Pass original kwargs so __init__ can filter out param kwargs itself. if instance is not None: Strategy.__init__(instance, *args, **kwargs) return instance
[docs] def __new__(cls, *args, **kwargs): """Override __new__ to handle method renaming that was done in MetaStrategy""" # CRITICAL: First call StrategyBase.__new__ to properly set up data arguments and lines # This ensures strategies get their data arguments processed correctly instance = super().__new__(cls, *args, **kwargs) # Store the original kwargs for parameter processing instance._strategy_init_kwargs = kwargs # CRITICAL FIX: Manually set up parameters here since Strategy inherits from ParamsMixin # But we need to ensure the kwargs from cerebro.addstrategy are properly processed if hasattr(cls, "_params") and cls._params is not None: params_cls = cls._params param_names = set() # Get all parameter names from the class if hasattr(params_cls, "_getpairs"): param_names.update(params_cls._getpairs().keys()) elif hasattr(params_cls, "_gettuple"): param_names.update(key for key, value in params_cls._gettuple()) # Filter parameter kwargs param_kwargs = {k: v for k, v in kwargs.items() if k in param_names} # Create parameter instance try: instance._params_instance = params_cls() except Exception as exc: raise TypeError( f"Failed to create params instance for {cls.__name__}: {exc}" ) from exc # Set all parameter values - first defaults, then custom values if hasattr(params_cls, "_getpairs"): for key, value in params_cls._getpairs().items(): # Use custom value if provided, otherwise use default final_value = param_kwargs.get(key, value) setattr(instance._params_instance, key, final_value) elif hasattr(params_cls, "_gettuple"): for key, value in params_cls._gettuple(): # Use custom value if provided, otherwise use default final_value = param_kwargs.get(key, value) setattr(instance._params_instance, key, final_value) # Set any extra parameters that were passed but not in the params definition for key, value in param_kwargs.items(): if not hasattr(instance._params_instance, key): setattr(instance._params_instance, key, value) else: # No parameters defined, create parameter instance from kwargs instance._params_instance = type("ParamsInstance", (), {})() # Set all kwargs as parameters for key, value in kwargs.items(): setattr(instance._params_instance, key, value) # Create p property for parameter access instance.p = instance._params_instance # Handle method renaming like the old MetaStrategy.__new__ did if hasattr(cls, "notify") and not hasattr(cls, "notify_order"): cls.notify_order = cls.notify delattr(cls, "notify") if hasattr(cls, "notify_operation") and not hasattr(cls, "notify_trade"): cls.notify_trade = cls.notify_operation delattr(cls, "notify_operation") # Register subclasses (from MetaStrategy.__init__) if ( not getattr(cls, "aliased", False) and cls.__name__ != "Strategy" and not cls.__name__.startswith("_") ): cls._indcol[cls.__name__] = cls # Initialize critical attributes early (from MetaStrategy.donew and dopreinit) # These need to be available before __init__ completes since methods might be called from .cerebro import Cerebro instance.env = instance.cerebro = cerebro = findowner(instance, Cerebro) instance._id = cerebro._next_stid() instance.broker = instance.env.broker from .sizers import FixedSize instance._sizer = FixedSize() instance.stats = instance.observers = ItemCollection() instance.analyzers = ItemCollection() instance._alnames = collections.defaultdict(itertools.count) instance.writers = [] instance._slave_analyzers = [] instance._tradehistoryon = False instance._orders = [] instance._orderspending = [] instance._trades = collections.defaultdict(AutoDictList) instance._tradespending = [] return instance
def __init__(self, *args, **kwargs): """Initialize with functionality from MetaStrategy methods""" # Critical attributes already initialized in __new__ # Handle the functionality that was in MetaStrategy.dopostinit self._sizer.set(self, self.broker) # OPTIMIZED: Simple and fast data extraction from args # Cerebro passes datas at the beginning of args (cerebro.py:1433) if not hasattr(self, "datas") or not self.datas: self.datas = [] # Quick method: Extract datas directly from args # Cerebro prepends all datas to args, so we just need to identify them if args: for arg in args: # Fast check: data feeds have 'lines' and 'datetime' attributes if hasattr(arg, "lines") and hasattr(arg, "datetime"): self.datas.append(arg) # No need for nested loops or complex checks # Fallback: Try cerebro.datas directly (fast) if not self.datas and hasattr(self, "cerebro") and self.cerebro is not None: if hasattr(self.cerebro, "datas") and self.cerebro.datas: self.datas = list(self.cerebro.datas) # Set up primary data reference and data0/data1 aliases if self.datas: self.data = self.datas[0] for d, data in enumerate(self.datas): setattr(self, f"data{d}", data) else: self.data = None # Set up clock - this is critical for strategy execution if not hasattr(self, "_clock") or self._clock is None: if self.datas: self._clock = self.datas[0] # CRITICAL FIX: Don't create MinimalClock fallback # It causes problems with indicator clock detection in _periodset() # If no datas, leave _clock as None and let it be set later # Call user subclass __init__ if this is a Strategy subclass if self.__class__ != Strategy: from backtrader.lineiterator import StrategyBase # Guard against recursive calls when user's __init__ calls super().__init__() if not getattr(self, "_user_init_called", False): self._user_init_called = True for cls in self.__class__.__mro__: if ( cls not in (Strategy, StrategyBase) and hasattr(cls, "__init__") and "__init__" in cls.__dict__ ): # Use _original_init if available to avoid calling patched_init # (prevents infinite recursion when ParamsMixin patches __init__) if hasattr(cls, "_original_init"): user_init = cls._original_init else: user_init = cls.__dict__["__init__"] # Use OwnerContext so indicators find this strategy as owner with OwnerContext.set_owner(self): user_init(self) break # Initialize tick/channel callback state (auto, no manual init needed) if not hasattr(self, "_tick_count"): self._tick_count = 0 self._event_count = 0 self._last_tick = {} self._last_ob = {} self._last_funding = {} # Initialize critical attributes that are expected by strategy execution # These should be available before any user code runs if not hasattr(self, "_dlens"): self._dlens = [len(data) for data in self.datas] # CRITICAL FIX: DO NOT call super().__init__() here! # StrategyBase.__init__ already calls super().__init__() which eventually # calls Strategy.__init__. Calling super() again would create infinite recursion. # The parent initialization is already done by StrategyBase. # Clean up the temporary attribute if hasattr(self, "_strategy_init_kwargs"): delattr(self, "_strategy_init_kwargs") # Line type is strategy type _ltype = LineIterator.StratType # CSV default is True csv = True # Old clock update methodology, default is False _oldsync = False # update the clock using old methodology: data 0 # Keep the latest delivered data date in the line lines = ("datetime",)
[docs] def log(self, txt, dt=None, level="info"): """Log a message with optional datetime. This method provides basic logging functionality. For comprehensive logging including orders, trades, positions, indicators, and signals, use the TradeLogger observer. Args: txt: The message text to log. dt: Optional datetime. If None, uses current bar datetime. level: Log level ('info', 'warning', 'error', 'debug'). Example: >>> self.log(f'Close price: {self.data.close[0]:.2f}') >>> self.log('Warning message', level='warning') """ dt = dt or self.datetime.datetime() print(f"[{dt}] {txt}")
def _notify_signal_to_observers(self, action, size, price, data=None, reason=None): """Notify all TradeLogger observers about a trading signal. This is called automatically by buy() and sell() methods to record signals without requiring user intervention. Args: action: 'buy' or 'sell' size: Order size price: Signal price data: Data feed (optional) reason: Signal reason (optional) """ # Notify all observers that have log_signal method (TradeLogger) if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "log_signal"): data_name = getattr(data, "_name", None) if data else None observer.log_signal(action, size, price, data_name, reason) def _notify_order_to_observers(self, order): """Notify all observers about an order status change. Args: order: The order object with status change """ if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_order"): observer.notify_order(order) def _notify_trade_to_observers(self, trade): """Notify all observers about a trade. Args: trade: The trade object """ if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_trade"): observer.notify_trade(trade) def _notify_store_to_observers(self, msg, *args, **kwargs): """Forward store notifications to observers that support runtime events.""" if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_store_event"): observer.notify_store_event(msg, *args, **kwargs) def _notify_data_to_observers(self, data, status, *args, **kwargs): """Forward data-feed notifications to observers that support runtime events.""" if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_data_event"): observer.notify_data_event(data, status, *args, **kwargs) def _notify_tick_to_observers(self, tick): """Forward tick events to observers that support tick logging.""" if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_tick_event"): observer.notify_tick_event(tick) def _notify_bar_to_observers(self, bar): """Forward bar events to observers that support bar logging.""" if hasattr(self, "stats") and self.stats: for observer in self.stats: if hasattr(observer, "notify_bar_event"): observer.notify_bar_event(bar)
[docs] def qbuffer(self, savemem=0, replaying=False): """Enable the memory saving schemes. Possible values for ``savemem``: 0: No savings. Each line object keeps in memory all values 1: All lines objects save memory, using the strict minimum needed Negative values are meant to be used when plotting is required: -1: Indicators at Strategy Level and Observers do not enable memory savings (but anything declared below it does) -2: Same as -1 plus activation of memory saving for any indicators which has declared *plotinfo.plot* as False (will not be plotted) """ # If savemem < 0 if savemem < 0: # Get any attribute that labels itself as Indicator for ind in self._lineiterators[self.IndType]: # Check if this ind is a single line subsave = isinstance(ind, (LineSingle,)) # If not a single line and savemem == -2, check plotinfo.plot if not subsave and savemem < -1: subsave = not ind.plotinfo.plot # Apply memory saving based on subsave flag ind.qbuffer(savemem=subsave) # If savemem > 0 elif savemem > 0: # Apply memory saving to all data feeds for data in self.datas: data.qbuffer(replaying=replaying) # Apply memory saving to all lines for line in self.lines: line.qbuffer(savemem=1) # Apply memory saving to all lineiterators based on the strategy for itcls in self._lineiterators: for it in self._lineiterators[itcls]: it.qbuffer(savemem=1) # If savemem == 0, no action needed else: pass
def _iter_strategy_lineactions(self): """Yield LineActions stored as strategy attributes.""" from .linebuffer import LineActions try: cache = object.__getattribute__(self, "_strategy_lineactions_cache") except AttributeError: cache = None if cache is not None: yield from cache return seen = set() registered = set() def mark_registered(lineiter): lineiter_id = id(lineiter) if lineiter_id in registered: return registered.add(lineiter_id) try: child_lists = lineiter._lineiterators.values() except AttributeError: return for children in child_lists: for child in children: mark_registered(child) try: for children in self._lineiterators.values(): for child in children: mark_registered(child) except AttributeError: # No sub-iterator registry yet; nothing to pre-mark as registered. pass def visit(value): value_id = id(value) if value_id in seen: return seen.add(value_id) if isinstance(value, LineActions): for attr_name in ("_parent_a", "_parent_b", "a", "b", "cond"): try: dependency = getattr(value, attr_name) except AttributeError: continue yield from visit(dependency) try: args = value.args except AttributeError: args = () for dependency in args: yield from visit(dependency) if value_id not in registered: yield value return if isinstance(value, dict): for item in value.values(): yield from visit(item) elif isinstance(value, (list, tuple, set, frozenset)): for item in value: yield from visit(item) try: attrs = object.__getattribute__(self, "__dict__") except AttributeError: attrs = {} result = [] for attr_name, attr_value in attrs.items(): if attr_name.startswith("_"): continue result.extend(visit(attr_value)) cache = tuple(result) object.__setattr__(self, "_strategy_lineactions_cache", cache) yield from cache def _get_strategy_lineactions(self): try: return object.__getattribute__(self, "_strategy_lineactions_cache") except AttributeError: return tuple(self._iter_strategy_lineactions()) def _get_strategy_next_lineactions(self): try: return object.__getattribute__(self, "_strategy_next_lineactions_cache") except AttributeError: # Cache not built yet; compute and store it below. pass cache = tuple( (lineaction, getattr(lineaction, "_clock", None)) for lineaction in self._get_strategy_lineactions() if hasattr(lineaction, "_next") ) object.__setattr__(self, "_strategy_next_lineactions_cache", cache) return cache def _stage2(self): super()._stage2() for lineaction in self._get_strategy_lineactions(): try: lineaction._stage2() except Exception: logger.debug("Failed to stage2 strategy LineActions", exc_info=True) def _stage1(self): super()._stage1() for lineaction in self._get_strategy_lineactions(): try: lineaction._stage1() except Exception: logger.debug("Failed to stage1 strategy LineActions", exc_info=True) def _next_strategy_lineactions(self): """Advance LineActions stored directly on the strategy. LineActions created as strategy attributes, such as bt.Cmp/bt.If/bt.Max, are not LineIterator indicators. They still need to be evaluated before user next/prenext/nextstart reads them. """ for lineaction, clock in self._get_strategy_next_lineactions(): if clock is not None: try: if len(clock) <= len(lineaction): try: current_value = lineaction[0] except Exception: # nosec B112 # Per-bar hot path: the line buffer may not be # populated yet for this index. Skipping is the # intended behaviour; logging here would fire every # bar. No control-flow change. continue if current_value == current_value: continue lineaction.next() continue except Exception: # nosec B110 # Clock/value comparison unavailable; fall back to plain # _next(). Hot path, intentionally silent (no logging). pass lineaction._next() def _periodset(self): """Calculate and set the minimum period required for strategy execution. This method determines the minimum number of bars needed before the strategy's next() method can be called, based on the minimum periods of all indicators and data feeds. """ def iter_indicator_tree(indicators): seen = set() stack = list(indicators) while stack: lineiter = stack.pop(0) lineiter_id = id(lineiter) if lineiter_id in seen: continue seen.add(lineiter_id) yield lineiter try: children = lineiter._lineiterators[LineIterator.IndType] except (AttributeError, KeyError): continue stack.extend(children) # Data IDs dataids = [id(data) for data in self.datas] # Data minimum periods _dminperiods = collections.defaultdict(list) # Loop through all indicators all_indicators = list(iter_indicator_tree(self._lineiterators[LineIterator.IndType])) # CRITICAL FIX: bind secondary-feed indicator advance clocks now that # the whole indicator tree is built. During construction, an indicator # built on another indicator or a LinesOperation that follows a # non-primary feed (e.g. SMA((h1.high + h1.low) / 2.0) or # EMA(EMA(h4.close))) often has its clock defaulted to the strategy's # primary feed because the parent clocks were not yet finalized. That # makes the indicator warm up and emit values on the primary (fast) # clock instead of the secondary (slow) feed in runonce mode. Here every # clock is final, so we resolve each indicator's data dependency to the # concrete feed it follows and, when that feed is a *secondary* feed, # pin _resolved_secondary_clock to it (used by the runonce advance # loop and Indicator.advance). See docs/DEV_REGRESSION_FAILURES.md. from .lineiterator import _line_like_source_clock as _llsc primary_feed = self.datas[0] if self.datas else None # Map each feed's individual lines back to the owning feed so a clock # that resolves to a feed *line* (e.g. data.high) can be attributed to # its feed. _line_to_feed = {} for _data in self.datas: _dlines = getattr(_data, "lines", None) if _dlines is None: continue try: for _ln in _dlines: _line_to_feed[id(_ln)] = _data except TypeError: # Feed lines not iterable; skip mapping this feed's lines. pass def _feed_of(node, _seen=None): """Resolve a data node to the concrete feed it ultimately follows.""" if _seen is None: _seen = set() if node is None or id(node) in _seen: return None _seen.add(id(node)) if id(node) in dataids: return node if id(node) in _line_to_feed: return _line_to_feed[id(node)] try: src = _llsc(node) except Exception: src = None if src is not None and src is not node: if id(src) in dataids: return src if id(src) in _line_to_feed: return _line_to_feed[id(src)] found = _feed_of(src, _seen) if found is not None: return found nxt = getattr(node, "_clock", None) if nxt is not None and nxt.__class__.__name__ != "MinimalClock": found = _feed_of(nxt, _seen) if found is not None: return found ndatas = getattr(node, "datas", None) if ndatas: found = _feed_of(ndatas[0], _seen) if found is not None: return found # Walk owner references: a node may be an indicator output line # (LineBuffer) whose owning indicator follows the target feed. The # owner can be the indicator directly, or a Lines container whose # _owner_ref points to it. for owner_attr in ("_owner", "_owner_ref"): owner = getattr(node, owner_attr, None) if owner is not None and id(owner) not in _seen: # A Lines container exposes _owner_ref to the real owner. ref = getattr(owner, "_owner_ref", None) if ref is not None and id(ref) not in _seen: found = _feed_of(ref, _seen) if found is not None: return found found = _feed_of(owner, _seen) if found is not None: return found # Operands of a LinesOperation. for op_attr in ("a", "b", "_parent_a", "_parent_b"): operand = getattr(node, op_attr, None) if operand is not None and id(operand) not in _seen: found = _feed_of(operand, _seen) if found is not None: return found return None if primary_feed is not None and len(self.datas) > 1: for lineiter in all_indicators: idatas = getattr(lineiter, "datas", None) if not idatas: continue feed = _feed_of(idatas[0]) if feed is not None and feed is not primary_feed and id(feed) in dataids: # Pin only the advance clock used by the runonce post-phase # loop and Indicator.advance(); deliberately do NOT change # lineiter._clock here so the existing minperiod-to-feed # attribution below (and thus the strategy warmup / bar_num) # stays identical to the pre-fix behavior. Changing _clock # perturbed multi-feed warmup for unrelated strategies. lineiter._resolved_secondary_clock = feed for lineiter in all_indicators: # If multiple datas are used and multiple timeframes, the larger # timeframe may place larger time constraints in calling next. # Get the indicator's _clock attribute clk = getattr(lineiter, "_clock", None) # CRITICAL FIX: If clock is MinimalClock, use the indicator's actual data source if ( clk is not None and hasattr(clk, "__class__") and "MinimalClock" in clk.__class__.__name__ ): if self.datas: # Find which data feed the indicator's data source belongs to clock_set = False if hasattr(lineiter, "datas") and lineiter.datas: ind_data = lineiter.datas[0] for data_feed in self.datas: # Check if ind_data is the data feed itself if ind_data is data_feed: clk = data_feed clock_set = True break # Check if ind_data is one of the lines of this data feed if hasattr(data_feed, "lines") and ind_data in data_feed.lines: clk = data_feed clock_set = True break if not clock_set: clk = self.datas[0] lineiter._clock = clk # Update indicator's clock else: clk = None # If the attribute value is None if clk is None: # Get the indicator's owner's _clock attribute value clk = getattr(lineiter._owner, "_clock", None) # CRITICAL FIX: If owner's clock is also MinimalClock, use data if ( clk is not None and hasattr(clk, "__class__") and "MinimalClock" in clk.__class__.__name__ ): if self.datas: clk = self.datas[0] else: clk = None if clk is None: continue # If clk is not None while True: # If clk is a data feed, break if id(clk) in dataids: break # already top-level clock (data feed) # See if the current clock has higher level clocks # Check if current clk has further _clock attribute clk2 = getattr(clk, "_clock", None) # If clk2 is None, get clk owner's _clock attribute value if clk2 is None: clk2 = getattr(clk._owner, "_clock", None) if clk2 is None: break # if no clock found, bail out # If clk2 is not None, set clk to clk2 clk = clk2 # keep the ref and try to go up the hierarchy # This check ensures clk is not None before proceeding if clk is None: continue # no clock found, go to next # LineSeriesStub wraps a line and the clock is the wrapped line and # not the wrapper itself. # If clk is LineSeriesStub (multi-line object), get first line as clk if isinstance(clk, LineSeriesStub): clk = clk.lines[0] # Save minimum period _dminperiods[clk].append(lineiter._minperiod) # Set minimum periods to empty list self._minperiods = [] # Loop through all data feeds for data in self.datas: # Do not only consider the data as clock but also its lines, which # may have been individually passed as clock references and # discovered as clocks above # Initialize with a data min period if any # Minimum period needed for data to generate indicator lines dlminperiods = _dminperiods[data] # Loop through each line of data, add minperiods if line is in _dminperiods for line in data.lines: # search each line for min periods if line in _dminperiods: dlminperiods += _dminperiods[line] # found, add it # Keep the reference to the line if any was found # If dlminperiods is not empty, calculate max value, else empty list _dminperiods[data] = [max(dlminperiods)] if dlminperiods else [] # Data minimum period dminperiod = max(_dminperiods[data] or [data._minperiod]) # Save minimum period to dminperiod self._minperiods.append(dminperiod) # Set the minperiod # Indicator minimum periods minperiods = [x._minperiod for x in all_indicators] # CRITICAL FIX: Also scan strategy attributes for LineActions objects # (like LinesOperation from sma - sma(-10)) that aren't registered as indicators # but still need their minperiod considered from .linebuffer import LineActions for attr_name in dir(self): if attr_name.startswith("_"): continue try: attr = getattr(self, attr_name) # Check if it's a LineActions but not already in _lineiterators if isinstance(attr, LineActions) and hasattr(attr, "_minperiod"): if attr not in self._lineiterators[LineIterator.IndType]: minperiods.append(attr._minperiod) except (AttributeError, TypeError): # Attribute access/typecheck failed; skip this attribute. pass # Set strategy minimum period to max of indicator and data minperiods self._minperiod = max(minperiods or [self._minperiod]) # CRITICAL FIX: Update _minperiods for LineActions, but only for their associated data # For single-data strategies, apply LineActions minperiod to data[0] # For multi-data strategies, LineActions minperiod should only affect its source data from .linebuffer import LineActions if self._minperiods: for attr_name in dir(self): if attr_name.startswith("_"): continue try: attr = getattr(self, attr_name) if isinstance(attr, LineActions) and hasattr(attr, "_minperiod"): # Try to determine which data this LineActions is associated with # by checking its _clock or data sources data_idx = 0 # Default to data[0] if hasattr(attr, "_clock") and attr._clock is not None: for i, d in enumerate(self.datas): if attr._clock is d or attr._clock in d.lines: data_idx = i break # Only update minperiod for the specific data if data_idx < len(self._minperiods): self._minperiods[data_idx] = max( self._minperiods[data_idx], attr._minperiod ) except (AttributeError, TypeError): # Attribute access/typecheck failed; skip this attribute. pass def _addwriter(self, writer): """Add a writer to the strategy. Unlike the other _addxxx functions, this one receives an instance because the writer works at cerebro level and is only passed to the strategy to simplify the logic. """ self.writers.append(writer) def _addindicator(self, indcls, *indargs, **indkwargs): """Add an indicator to the strategy. Args: indcls: Indicator class to instantiate *indargs: Positional arguments for the indicator **indkwargs: Keyword arguments for the indicator """ indcls(*indargs, **indkwargs) def _addanalyzer_slave(self, ancls, *anargs, **ankwargs): """Add a slave analyzer for internal use. Like _addanalyzer but meant for observers (or other entities) which rely on the output of an analyzer for the data. These analyzers have not been added by the user and are kept separate from the main analyzers. Args: ancls: Analyzer class to instantiate *anargs: Positional arguments for the analyzer **ankwargs: Keyword arguments for the analyzer Returns: The created analyzer instance """ # Use OwnerContext so analyzer's findowner() can find this strategy with OwnerContext.set_owner(self): analyzer = ancls(*anargs, **ankwargs) self._slave_analyzers.append(analyzer) return analyzer def _getanalyzer_slave(self, idx): """Get a slave analyzer by index. Note: This appears to have a syntax bug - should use [] not append() """ return self._slave_analyzers.append[idx] def _addanalyzer(self, ancls, *anargs, **ankwargs): """Add an analyzer to the strategy. Args: ancls: Analyzer class to instantiate *anargs: Positional arguments for the analyzer **ankwargs: Keyword arguments for the analyzer, may include _name """ anname = ankwargs.pop("_name", "") or ancls.__name__.lower() nsuffix = next(self._alnames[anname]) anname += str(nsuffix or "") # 0 (first instance) gets no suffix # Use OwnerContext so analyzer's findowner() can find this strategy with OwnerContext.set_owner(self): analyzer = ancls(*anargs, **ankwargs) # PERFORMANCE FIX: Explicitly set analyzer's owner to ensure it has access to strategy analyzer._parent = self analyzer._owner = self self.analyzers.append(analyzer, anname) def _addobserver(self, multi, obscls, *obsargs, **obskwargs): """Add an observer to the strategy. Args: multi: If True, create one observer per data feed; if False, create single observer obscls: Observer class to instantiate *obsargs: Positional arguments for the observer **obskwargs: Keyword arguments for the observer, may include obsname """ obsname = obskwargs.pop("obsname", "") if not obsname: obsname = obscls.__name__.lower() if not multi: newargs = list(itertools.chain(self.datas, obsargs)) # Use OwnerContext so observer's findowner() can find this strategy with OwnerContext.set_owner(self): obs = obscls(*newargs, **obskwargs) # PERFORMANCE FIX: Explicitly set observer's owner to ensure it has access to strategy obs._parent = self obs._owner = self self._register_observer(obs) self.stats.append(obs, obsname) return setattr(self.stats, obsname, []) obs_list = getattr(self.stats, obsname) for data in self.datas: # Use OwnerContext so observer's findowner() can find this strategy with OwnerContext.set_owner(self): obs = obscls(data, *obsargs, **obskwargs) # PERFORMANCE FIX: Explicitly set observer's owner to ensure it has access to strategy obs._parent = self obs._owner = self self._register_observer(obs) obs_list.append(obs) def _register_observer(self, obs): """Prepare an observer for execution. Ensures the observer has _analyzers and has its clock set properly for strategy-wide observers. Does NOT register in _lineiterators to avoid double-processing (observers are processed bar-by-bar via _next_observers, not via _once batch processing). Args: obs: Observer instance to register. """ # Ensure _analyzers exists (some observers don't call super().__init__) if not hasattr(obs, "_analyzers"): obs._analyzers = [] # Set clock for strategy-wide observers if getattr(obs, "_stclock", False): obs._clock = self def _getminperstatus(self): """Check if minimum period requirements are satisfied. Returns the maximum difference between required minimum periods and current data lengths. Returns: int: Maximum value of (minperiod - current_length) across all data feeds. Negative values indicate all minimum periods are satisfied. """ data_iter = iter(zip(self._minperiods, self.datas)) try: minperiod, data = next(data_iter) except StopIteration: raise ValueError("max() arg is an empty sequence") from None minperstatus = minperiod - len(data) for minperiod, data in data_iter: status = minperiod - len(data) if status > minperstatus: minperstatus = status self._minperstatus = minperstatus return minperstatus
[docs] def prenext_open(self): """Called before next() during prenext phase. This is a hook for strategies to take action at the open of each bar before minimum period is reached. """
[docs] def nextstart_open(self): """Called at the open of the first bar where minimum period is satisfied. This is called only once, transitioning from prenext to next phase. """ self.next_open()
[docs] def next_open(self): """Called at the open of each bar during normal execution. This is a hook for strategies to take action at the open of each bar. """
def _oncepost_open(self): """Prepare for _oncepost execution based on minimum period status. Routes to appropriate method based on minperstatus: - minperstatus < 0: All data satisfied, call next_open() - minperstatus == 0: First bar with satisfied data, call nextstart_open() - minperstatus > 0: Data not ready, call prenext_open() """ minperstatus = self._minperstatus if minperstatus < 0: self.next_open() elif minperstatus == 0: self.nextstart_open() # only called for the 1st value else: self.prenext_open() def _oncepost(self, dt): """Execute oncepost processing for a single time step. Args: dt: Current datetime """ # CRITICAL FIX: Ensure _clock is set to actual data, not MinimalClock # During initialization, _clock might be set to MinimalClock if datas weren't available yet if hasattr(self, "_clock") and self._clock is not None: clock_type_name = type(self._clock).__name__ if clock_type_name == "MinimalClock" and self.datas: # Replace MinimalClock with actual first data self._clock = self.datas[0] elif not hasattr(self, "_clock") or self._clock is None: # Set clock to first data if not set if self.datas: self._clock = self.datas[0] # Loop through indicators, advance if indicator clock length exceeds indicator length for indicator in self._lineiterators[LineIterator.IndType]: # Honor a pinned secondary-feed clock (set in _periodset) so # indicators following a non-primary feed advance in sync with it. adv_clock = getattr(indicator, "_resolved_secondary_clock", None) or indicator._clock if len(adv_clock) > len(indicator): indicator.advance() # If using old data sync method, call advance; otherwise call forward if self._oldsync: # Strategy has not been reset, the line is there self.advance() else: # strategy has been reset to beginning. advance step by step self.forward() # Set datetime - and save it as the last valid datetime for use in stop() self.lines.datetime[0] = dt if dt > 0: self._last_valid_datetime = dt # Notify self._notify() self._next_strategy_lineactions() # CRITICAL FIX: In runonce mode, ensure indicator lencount matches strategy length # This ensures len(indicator) == len(strategy) at the end of processing try: strategy_len = len(self) strategy_clock = getattr(self, "_clock", None) for indicator in self._lineiterators[LineIterator.IndType]: if getattr(indicator, "_clock", None) is not strategy_clock: continue # Only update if indicator was processed in runonce mode if hasattr(indicator, "_once_called") and indicator._once_called: # Update lencount for all lines in the indicator if hasattr(indicator, "lines") and hasattr(indicator.lines, "lines"): for line in indicator.lines.lines: if hasattr(line, "lencount"): # Set lencount to match strategy length (which equals data length) # Use the maximum of current lencount and strategy_len to ensure we don't decrease it line.lencount = max(line.lencount, strategy_len) except Exception: logger.debug("Failed to update indicator lencount in _oncepost_nextday", exc_info=True) # Get current minimum period status and route to appropriate method # If all data satisfied, call next() # If first bar with all data satisfied, call nextstart() # If not all data satisfied, call prenext() minperstatus = self._getminperstatus() if minperstatus < 0: self.next() elif minperstatus == 0: self.nextstart() # only called for the 1st value else: self.prenext() # Update analyzers with minimum period status self._next_analyzers(minperstatus, once=True) # Update observers with minimum period status self._next_observers(minperstatus, once=True) # Clear pending orders and trades self.clear() def _clk_update(self): """Update the clock and advance strategy state if needed. Returns: int: Current length of the strategy """ # CRITICAL FIX: Ensure data is available before clock operations if ( getattr(self, "_data_assignment_pending", True) or not hasattr(self, "_clock") or self._clock is None ): # Try to get data assignment from cerebro if not already done if hasattr(self, "_ensure_data_available"): self._ensure_data_available() # If using old data sync method if self._oldsync: # Call strategy's _clk_update() method clk_len = super()._clk_update() # Set datetime if self.datas: max_datetime = None for data in self.datas: if not len(data): continue dt_value = data.datetime[0] if ( isinstance(dt_value, (int, float)) and math.isfinite(dt_value) and dt_value > 0 ): if max_datetime is None or dt_value > max_datetime: max_datetime = dt_value if max_datetime is not None: self.lines.datetime[0] = max_datetime # Return data length return clk_len # CRITICAL FIX: Initialize _dlens if not present if not hasattr(self, "_dlens"): self._dlens = [len(d) for d in self.datas] # Current new data lengths and valid datetimes in a single pass. newdlens = [] max_datetime = None for data in self.datas: data_len = len(data) newdlens.append(data_len) if data_len: dt_value = data.datetime[0] if isinstance(dt_value, (int, float)) and math.isfinite(dt_value) and dt_value > 0: if max_datetime is None or dt_value > max_datetime: max_datetime = dt_value # If new data length > old data length, forward if any(nl > old_len for old_len, nl in zip(self._dlens, newdlens)): self.forward() # Set datetime to max of current datetimes - only update if we have valid datetimes if max_datetime is not None: self.lines.datetime[0] = max_datetime # Old data length equals new data length self._dlens = newdlens return len(self) def _next_open(self): """Execute next_open phase based on minimum period status. Same logic as _oncepost_open(). """ minperstatus = self._minperstatus if minperstatus < 0: self.next_open() elif minperstatus == 0: self.nextstart_open() # only called for the 1st value else: self.prenext_open() def _next(self): """Execute next() method and update analyzers and observers. Gets minimum period status and passes it to analyzers and observers, then clears pending orders and trades. """ super()._next() minperstatus = self._getminperstatus() self._next_analyzers(minperstatus) self._next_observers(minperstatus) self.clear() def _get_all_observers(self): """Get all observer instances from self.stats. Handles both single observers and multi-data observer lists. Ensures each observer has _analyzers attribute. Returns: list: Flat list of all observer instances. """ result = [] for item in self.stats.items: if isinstance(item, list): for obs in item: if not hasattr(obs, "_analyzers"): obs._analyzers = [] result.append(obs) else: if not hasattr(item, "_analyzers"): item._analyzers = [] result.append(item) return result def _next_observers(self, minperstatus, once=False): """Update observers based on minimum period status. Iterates over self.stats.items (populated by _addobserver) instead of _lineiterators[ObsType] which is intentionally kept empty to avoid double-processing with _once() batch mode. Note: Some pre-existing observers may have broken __init__ chains (due to ObserverBase.__init_subclass__ wrapped_init not calling super().__init__). Their next() calls are wrapped in try/except to prevent one broken observer from crashing the entire run. Args: minperstatus: Current minimum period status once: If True, running in runonce mode; otherwise running in next() mode """ # Collect all observers from _lineiterators observers_to_process = list(self._lineiterators[LineIterator.ObsType]) # Also include TradeLogger observers from stats that are not in _lineiterators # (TradeLogger needs next() to be called for position/indicator logging) for obs in self.stats: if obs not in observers_to_process: # Only add TradeLogger type observers to avoid breaking other observers if obs.__class__.__name__ == "TradeLogger": observers_to_process.append(obs) # Loop through observers for observer in observers_to_process: # For each analyzer in the observer (if observer has _analyzers) for analyzer in getattr(observer, "_analyzers", []): # Route to appropriate analyzer method based on minperstatus if minperstatus < 0: analyzer._next() elif minperstatus == 0: analyzer._nextstart() # only called for the 1st value else: analyzer._prenext() # If running in once mode if once: # If current data length > observer length if len(self) > len(observer): # If using old data sync method, call advance, else call forward if self._oldsync: observer.advance() else: observer.forward() # Route to appropriate observer method based on minperstatus if minperstatus < 0: observer.next() elif minperstatus == 0: observer.nextstart() # only called for the 1st value elif len(observer): observer.prenext() # If not in once mode, call _next() else: observer._next() def _next_analyzers(self, minperstatus, once=False): """Update analyzers based on minimum period status. Args: minperstatus: Current minimum period status once: If True, running in runonce mode (unused but kept for consistency) """ for analyzer in self.analyzers: if minperstatus < 0: analyzer._next() elif minperstatus == 0: analyzer._nextstart() # only called for the 1st value else: analyzer._prenext() def _settz(self, tz): """Set timezone for strategy's datetime line. Args: tz: Timezone to set """ self.lines.datetime._settz(tz) def _start(self): """Initialize strategy and start execution. Calculates minimum periods, starts analyzers and observers, and calls user's start() method. """ # Calculate and set required minimum period self._periodset() # Start analyzers for analyzer in itertools.chain(self.analyzers, self._slave_analyzers): analyzer._start() # Start observers for obs in self.observers: if not isinstance(obs, list): obs = [obs] # support of multi-data observers for o in obs: o._start() # Change operators to stage 2 self._stage2() # Current length of each data self._dlens = [len(data) for data in self.datas] # Current minimum period status defaults to MAXINT (start in prenext) self._minperstatus = MAXINT # Call user's start() self.start()
[docs] def start(self): """Called right before the backtesting is about to be started. This is a hook for strategies to perform initialization before the backtesting loop begins. """
[docs] def getwriterheaders(self): """Get the CSV headers for writer output. Returns: list: Headers including indicator/observer names and line aliases """ # Filter indicators and observers for CSV output self.indobscsv = [self] # Filter indicators and observers, include only those with csv=True indobs = itertools.chain(self.getindicators_lines(), self.getobservers()) self.indobscsv.extend(filter(lambda x: x.csv, indobs)) # Initialize headers as empty list headers = [] # Prepare the indicators/observers data headers # Loop through indicators/observers marked for CSV output for iocsv in self.indobscsv: # Get indicator/observer name or class name name = iocsv.plotinfo.plotname or iocsv.__class__.__name__ # Add name, length, and line aliases to headers headers.append(name) headers.append("len") headers.extend(iocsv.getlinealiases()) # Return headers return headers
[docs] def getwritervalues(self): """Get current values for writer output. Returns: list: Current values from indicators and observers """ values = [] # Loop through indicators/observers for iocsv in self.indobscsv: name = iocsv.plotinfo.plotname or iocsv.__class__.__name__ values.append(name) lio = len(iocsv) values.append(lio) # If length > 0, get each value if lio: values.extend(map(lambda line: line[0], iocsv.lines.itersize())) else: values.extend([""] * iocsv.lines.size()) return values
[docs] def getwriterinfo(self): """Get comprehensive writer information including params and analysis. Returns: AutoOrderedDict: Nested structure containing params, indicators, observers, and analyzer results """ # Initialize writer info as AutoOrderedDict wrinfo = AutoOrderedDict() # Set parameters wrinfo["Params"] = self.p._getkwargs() sections = [["Indicators", self.getindicators_lines()], ["Observers", self.getobservers()]] # Loop through indicators and observers for sectname, sectitems in sections: # Set specific values sinfo = wrinfo[sectname] for item in sectitems: itname = item.__class__.__name__ sinfo[itname].Lines = item.lines.getlinealiases() or None sinfo[itname].Params = item.p._getkwargs() or None # Set analyzer values ainfo = wrinfo.Analyzers # Internal Value Analyzer ainfo.Value.Begin = self.broker.startingcash ainfo.Value.End = self.broker.getvalue() # No slave analyzers for a writer for aname, analyzer in self.analyzers.getitems(): ainfo[aname].Params = analyzer.p._getkwargs() or None ainfo[aname].Analysis = analyzer.get_analysis() return wrinfo
def _stop(self): # CRITICAL FIX: In runonce mode, ensure indicator lencount matches strategy length # This must be done BEFORE calling user's stop() method, as tests check len(indicator) == len(strategy) try: strategy_len = len(self) strategy_clock = getattr(self, "_clock", None) # Update lencount for all indicators to match strategy length # This is critical for runonce mode where indicators are pre-calculated but lencount may not match if hasattr(self, "_lineiterators"): from .lineiterator import LineIterator for indicator in self._lineiterators.get(LineIterator.IndType, []): if getattr(indicator, "_clock", None) is not strategy_clock: continue # Update lencount for all lines in the indicator to match strategy length if hasattr(indicator, "lines") and hasattr(indicator.lines, "lines"): for line in indicator.lines.lines: if hasattr(line, "lencount"): # In runonce mode, set lencount to match strategy length (which equals data length) # This ensures len(indicator) == len(strategy) for test assertions line.lencount = strategy_len except Exception: logger.debug("Failed to update indicator lencount in _stop", exc_info=True) # CRITICAL FIX: Restore last valid datetime before calling user's stop() # This ensures datetime[0] is valid for logging in stop() method if hasattr(self, "_last_valid_datetime") and self._last_valid_datetime > 0: try: # Restore strategy datetime self.lines.datetime[0] = self._last_valid_datetime # CRITICAL: Also restore all data feed datetimes for data in self.datas: try: data.datetime[0] = self._last_valid_datetime except Exception: logger.debug( "Failed to restore datetime for data %s in _stop", getattr(data, "_name", data), ) except Exception: logger.debug("Failed to restore strategy datetime in _stop", exc_info=True) # Call user's stop() method - can be overridden in strategy subclass self.stop() # Stop analyzers (both user-added and slave analyzers for observers) for analyzer in itertools.chain(self.analyzers, self._slave_analyzers): analyzer._stop() # Stop observers (flush logs, etc.) for observer in self._get_all_observers(): try: if hasattr(observer, "stop"): observer.stop() except Exception: logger.warning( "Observer %s.stop() raised an exception", type(observer).__name__, exc_info=True, ) # Change operators back to stage 1 - allows reuse of datas self._stage1()
[docs] def stop(self): """Called right before the backtesting is about to be stopped. This is a hook for strategies to perform cleanup or final logging. """
[docs] def set_tradehistory(self, onoff=True): """Enable or disable trade history tracking. Args: onoff: If True, keep full trade history; if False, only track current trade """ self._tradehistoryon = onoff
[docs] def clear(self): """Clear pending orders and trades. Moves pending orders to _orders list and clears pending trades. """ self._orders.extend(self._orderspending) self._orderspending = [] self._tradespending = []
def _addnotification(self, order, quicknotify=False): """Add order notification and process trade updates. Args: order: The order that has been updated quicknotify: If True, immediately process notification without queueing """ # If not simulated trading, add order to pending orders if not order.p.simulated: self._orderspending.append(order) # If in quick notify mode, initialize qorders and qtrades if quicknotify: qorders = [order] qtrades: list = [] # If order has no executed volume if not order.executed.size: # If in quick notify mode, call _notify with info if quicknotify: self._notify(qorders=qorders, qtrades=qtrades) return # Get trade data - if order.data._compensate is None, use order.data; otherwise use order.data._compensate tradedata = getattr(order.data, "_compensate", None) if tradedata is None: tradedata = order.data # Get trade data - if trade exists in _trades, use the last one; otherwise create a new trade and save to datatrades tradekey = trade_key_from_order(order) datatrades = self._trades[tradedata][tradekey] if not datatrades: trade = Trade(data=tradedata, tradeid=tradekey, historyon=self._tradehistoryon) datatrades.append(trade) else: trade = datatrades[-1] # Loop through order execution bits for exbit in order.executed.iterpending(): # If execution bit is None, break loop if exbit is None: break # If execution bit indicates closed position if exbit.closed: # Update trade trade.update( order, exbit.closed, exbit.price, exbit.closedvalue, exbit.closedcomm, exbit.pnl, comminfo=order.comminfo, ) # If trade is closed if trade.isclosed: # Copy trade and add to _tradespending self._tradespending.append(copy.copy(trade)) # If quick notify needed, copy trade and add to qtrades if quicknotify: qtrades.append(copy.copy(trade)) # Update it if needed # If order execution bit indicates opened position if exbit.opened: # If trade is closed, create new trade and save to datatrades if trade.isclosed: trade = Trade(data=tradedata, tradeid=tradekey, historyon=self._tradehistoryon) datatrades.append(trade) # Update trade trade.update( order, exbit.opened, exbit.price, exbit.openedvalue, exbit.openedcomm, exbit.pnl, comminfo=order.comminfo, ) # This extra check covers the case in which different tradeid # orders have put the position down to 0 and the next order # "opens" a position but "closes" the trade # If trade is closed if trade.isclosed: # Copy trade and add to _tradespending self._tradespending.append(copy.copy(trade)) # If quick notify needed, copy trade and add to qtrades if quicknotify: qtrades.append(copy.copy(trade)) # If trade was just opened if trade.justopened: # Copy trade and add to _tradespending self._tradespending.append(copy.copy(trade)) # If quick notify needed, copy trade and add to qtrades if quicknotify: qtrades.append(copy.copy(trade)) # If quick notify needed, call _notify if quicknotify: self._notify(qorders=qorders, qtrades=qtrades) def _notify(self, qorders=None, qtrades=None): """Notify order and trade events to strategy and analyzers. Args: qorders: Quick notify orders (empty list if not in quick notify mode) qtrades: Quick notify trades (empty list if not in quick notify mode) """ if qorders is None: qorders = [] if qtrades is None: qtrades = [] # If quick notify is enabled if self.cerebro.p.quicknotify: # Need to know if quicknotify is on, to not reprocess pendingorders # and pendingtrades, which have to exist for things like observers # which look into it # Pending orders and trades are qorders and qtrades procorders = qorders proctrades = qtrades # Otherwise use orders and trades saved in _orderspending and _tradespending else: procorders = self._orderspending proctrades = self._tradespending # PERFORMANCE OPTIMIZATION: Cache merged analyzer list to avoid repeated itertools.chain # This is called 688K+ times, so caching makes a significant difference all_analyzers = getattr(self, "_all_analyzers_cache", None) if all_analyzers is None: all_analyzers = list(self.analyzers) + list(self._slave_analyzers) self._all_analyzers_cache = all_analyzers # Loop through pending orders for order in procorders: # If order execution type is not Historical or histnotify, notify order if order.exectype != order.Historical or order.histnotify: self.notify_order(order) # Notify order to analyzers (both user and slave analyzers) for analyzer in all_analyzers: analyzer._notify_order(order) # Notify order to observers (e.g., TradeLogger) self._notify_order_to_observers(order) # Loop through pending trades, notify, and notify analyzers for trade in proctrades: self.notify_trade(trade) for analyzer in all_analyzers: analyzer._notify_trade(trade) # Notify trade to observers (e.g., TradeLogger) self._notify_trade_to_observers(trade) # If qorders is not empty, return after processing orders if qorders: return # cash is notified regularly # Get cash, value, fundvalue, fundshares cash = self.broker.getcash() value = self.broker.getvalue() fundvalue = self.broker.fundvalue fundshares = self.broker.fundshares # Notify cash and value values, and notify analyzers self.notify_cashvalue(cash, value) # Notify fund values, and notify analyzers self.notify_fund(cash, value, fundvalue, fundshares) for analyzer in all_analyzers: analyzer._notify_cashvalue(cash, value) analyzer._notify_fund(cash, value, fundvalue, fundshares)
[docs] def add_timer( self, when, offset=datetime.timedelta(), repeat=datetime.timedelta(), weekdays=None, weekcarry=False, monthdays=None, monthcarry=True, allow=None, tzdata=None, cheat=False, *args, **kwargs, ): """Schedule a timer to invoke notify_timer or a callback. Note: Can be called during __init__ or start Schedules a timer to invoke either a specified callback or the notify_timer of one or more strategies. Args: when: Can be: - datetime.time instance (see tzdata below) - bt.timer.SESSION_START to reference session start - bt.timer.SESSION_END to reference session end offset (datetime.timedelta): Offset the when value. Used with SESSION_START/SESSION_END to trigger after session start/end. repeat (datetime.timedelta): If set, timer repeats at this interval within the same session. Resets to original when after session end. weekdays (list): Sorted iterable with integers (Monday=1, Sunday=7) indicating which days the timer can be invoked. Empty = all days. weekcarry (bool): If True and weekday not seen (e.g., holiday), execute on next day (even if in new week). monthdays (list): Sorted iterable with integers (1-31) indicating which days of month to execute. Empty = all days. monthcarry (bool): If True and day not seen (weekend, holiday), execute on next available day. allow (callable): Callback receiving datetime.date, returns True if date is allowed for timer execution. tzdata: Timezone data - None, pytz instance, or data feed instance. If None and when is SESSION_START/END, uses first data feed. cheat (bool): If True, timer called before broker evaluates orders, allowing orders based on opening price. *args: Additional args passed to notify_timer **kwargs: Additional kwargs passed to notify_timer Returns: The created timer instance """ return self.cerebro._add_timer( owner=self, when=when, offset=offset, repeat=repeat, weekdays=weekdays, weekcarry=weekcarry, monthdays=monthdays, monthcarry=monthcarry, allow=allow, tzdata=tzdata, strats=False, cheat=cheat, *args, **kwargs, )
[docs] def notify_timer(self, timer, when, *args, **kwargs): """Receive timer notifications. Receives a timer notification where ``timer`` is the timer instance returned by ``add_timer``, and ``when`` is the calling time. ``args`` and ``kwargs`` are any additional arguments passed to ``add_timer``. The actual ``when`` time can be later than expected, as the system may not have been able to call the timer before. This value is the timer's scheduled time, not the actual system time. Args: timer: The timer instance created by add_timer when: The scheduled time when the timer was triggered *args: Additional positional arguments passed to add_timer **kwargs: Additional keyword arguments passed to add_timer """
[docs] def notify_cashvalue(self, cash, value): """Notify the current cash and value of the strategy's broker. Args: cash: Current cash amount value: Current portfolio value """
[docs] def notify_fund(self, cash, value, fundvalue, shares): """Notify the current cash, value, fund value, and fund shares. Args: cash: Current cash amount value: Current portfolio value fundvalue: Current fund value shares: Current fund shares """
[docs] def notify_order(self, order): """Receive notification when an order status changes. Args: order: The order with changed status """
[docs] def notify_trade(self, trade): """Receive notification when a trade status changes. Args: trade: The trade with changed status """
[docs] def notify_store(self, msg, *args, **kwargs): """Receive notification from a store provider. Args: msg: Message from the store *args: Additional positional arguments **kwargs: Additional keyword arguments """
[docs] def notify_data(self, data, status, *args, **kwargs): """Receive notification from a data feed. Args: data: The data feed sending the notification status: Status code *args: Additional positional arguments **kwargs: Additional keyword arguments """
# ========== Tick/Channel Event Callbacks ==========
[docs] def notify_tick(self, tick): """Called when a new tick event arrives. Override this method to implement tick-level trading logic. Args: tick: TickEvent instance with price, volume, direction, etc. """
[docs] def notify_orderbook(self, orderbook): """Called when a new order book snapshot arrives. Override this method to implement orderbook-based trading logic. Args: orderbook: OrderBookSnapshot instance with bids, asks, spread, etc. """
[docs] def notify_funding(self, funding): """Called when a new funding rate event arrives. Override this method to implement funding rate arbitrage or position management. Args: funding: FundingEvent instance with rate, mark_price, etc. """
[docs] def notify_bar(self, bar): """Called when a bar event arrives from the channel system. This is different from the standard next() method which processes bars from LineSeries data feeds. This callback handles BarEvents from the channel/queue system. Args: bar: BarEvent instance with open, high, low, close, volume. """
[docs] def get_last_tick(self, symbol=None): """Get the last tick for a symbol. Args: symbol: Symbol name. If None, returns first available. Returns: TickEvent or None. """ if symbol: return self._last_tick.get(symbol) if self._last_tick: return next(iter(self._last_tick.values())) return None
[docs] def get_last_orderbook(self, symbol=None): """Get the last order book for a symbol. Args: symbol: Symbol name. If None, returns first available. Returns: OrderBookSnapshot or None. """ if symbol: return self._last_ob.get(symbol) if self._last_ob: return next(iter(self._last_ob.values())) return None
[docs] def get_last_funding(self, symbol=None): """Get the last funding rate for a symbol. Args: symbol: Symbol name. If None, returns first available. Returns: FundingEvent or None. """ if symbol: return self._last_funding.get(symbol) if self._last_funding: return next(iter(self._last_funding.values())) return None
# ========== Data Access Methods ==========
[docs] def getdatanames(self): """Get a list of all data names in the system. Returns: list: Names of all data feeds """ return keys(self.env.datasbyname)
[docs] def getdatabyname(self, name): """Get a data feed by its name. Args: name: Name of the data feed Returns: The data feed with the given name """ return self.env.datasbyname[name]
[docs] def cancel(self, order): """Cancel an order in the broker. Args: order: The order to cancel """ self.broker.cancel(order)
[docs] def buy( self, data=None, size=None, price=None, plimit=None, exectype=None, valid=None, tradeid=0, oco=None, trailamount=None, trailpercent=None, parent=None, transmit=True, **kwargs, ) -> Optional[Order]: """Create a buy (long) order and send it to the broker. Args: data: The data feed for the order. If None, uses the first data feed (self.data). size: Size to use (positive) for the order. If None, the sizer instance retrieved via getsizer will determine the size. price: Price to use. None is valid for Market and Close orders. For Limit, Stop and StopLimit orders this determines the trigger point. plimit: Only applicable to StopLimit orders. This is the price at which to set the implicit Limit order, once the Stop has been triggered. trailamount: For StopTrail/StopTrailLimit orders, an absolute amount which determines the distance to the price to keep the trailing stop. trailpercent: For StopTrail/StopTrailLimit orders, a percentage amount which determines the distance to the price to keep the trailing stop. exectype: Execution type. Possible values: - Order.Market or None: Market order - Order.Limit: Limit order - Order.Stop: Stop order - Order.StopLimit: Stop-limit order - Order.Close: Close order - Order.StopTrail: Stop-trail order - Order.StopTrailLimit: Stop-trail-limit order valid: Order validity. Possible values: - None: Good till cancel - datetime.datetime/date: Good till date - Order.DAY: Day order tradeid: Internal value to track overlapping trades. oco: Another order instance for OCO (Order Cancel Others) group. parent: Controls the relationship of a group of orders (e.g., bracket orders). transmit: If True, transmit the order to the broker. Used for controlling bracket orders. **kwargs: Additional broker-specific parameters. Returns: The submitted order, or None if size is 0. Example: Create a market buy order: >>> order = self.buy() Create a limit buy order: >>> order = self.buy(price=100.0, exectype=Order.Limit) """ # Resolve data argument if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: if self.datas: data = self.datas[0] else: raise ValueError( "No data feed available. In channel mode, pass a data " "object explicitly: self.buy(data=my_data, size=...)" ) # Use the provided size, otherwise calculate via getsizer size = size if size is not None else self.getsizing(data, isbuy=True) # If size is non-zero, submit the order if size: order = self.broker.buy( self, data, size=abs(size), price=price, plimit=plimit, exectype=exectype, valid=valid, tradeid=tradeid, oco=oco, trailamount=trailamount, trailpercent=trailpercent, parent=parent, transmit=transmit, **kwargs, ) # Auto-notify signal to TradeLogger observers signal_price = ( price if price is not None else ( data.close[0] if hasattr(data, "close") and hasattr(data.close, "__getitem__") else 0 ) ) self._notify_signal_to_observers("buy", abs(size), signal_price, data) return order return None
[docs] def sell( self, data=None, size=None, price=None, plimit=None, exectype=None, valid=None, tradeid=0, oco=None, trailamount=None, trailpercent=None, parent=None, transmit=True, **kwargs, ) -> Optional[Order]: """Create a sell (short) order and send it to the broker. See the documentation for ``buy`` for an explanation of the parameters. Returns: The submitted order, or None if no order was created """ # Resolve data argument if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: if self.datas: data = self.datas[0] else: raise ValueError( "No data feed available. In channel mode, pass a data " "object explicitly: self.sell(data=my_data, size=...)" ) size = size if size is not None else self.getsizing(data, isbuy=False) if size: order = self.broker.sell( self, data, size=abs(size), price=price, plimit=plimit, exectype=exectype, valid=valid, tradeid=tradeid, oco=oco, trailamount=trailamount, trailpercent=trailpercent, parent=parent, transmit=transmit, **kwargs, ) # Auto-notify signal to TradeLogger observers signal_price = ( price if price is not None else ( data.close[0] if hasattr(data, "close") and hasattr(data.close, "__getitem__") else 0 ) ) self._notify_signal_to_observers("sell", abs(size), signal_price, data) return order return None
[docs] def close(self, data=None, size=None, **kwargs) -> Optional[Order]: """Close a long or short position. Creates an order that counters the existing position to close it. Args: data: The data feed for which to close the position. If None, uses the default data feed. size: The size to close. If None, closes the entire position. **kwargs: Additional keyword arguments passed to the order. Note: If size is not provided, it is automatically calculated from the existing position to fully close it. Returns: The submitted order, or None if no position exists """ # Get the data feed if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: data = self.data position_side = kwargs.pop("position_side", None) position_side = normalize_position_side(position_side) broker_mode = normalize_position_mode( getattr(self.broker, "get_param", lambda *_args, **_kwargs: "net")( "position_mode", "net" ) ) if position_side is not None or broker_mode == POSITION_MODE_DUAL_SIDE: if position_side is None: long_size = abs(self.getposition(data, self.broker, side=POSITION_SIDE_LONG).size) short_size = abs(self.getposition(data, self.broker, side=POSITION_SIDE_SHORT).size) if long_size and short_size: raise ValueError( "close() requires position_side when both long and short legs are open" ) if long_size: position_side = POSITION_SIDE_LONG possize = long_size elif short_size: position_side = POSITION_SIDE_SHORT possize = short_size else: return None else: possize = abs(self.getposition(data, self.broker, side=position_side).size) size = abs(size if size is not None else possize) if not size: return None kwargs.setdefault("position_side", position_side) kwargs.setdefault("offset", POSITION_OFFSET_CLOSE) if position_side == POSITION_SIDE_LONG: return self.sell(data=data, size=size, **kwargs) return self.buy(data=data, size=size, **kwargs) # Get the current position size possize = self.getposition(data, self.broker).size # If size is None, close the entire position; otherwise close the specified size size = abs(size if size is not None else possize) # If position is long (positive), sell to close if possize > 0: return self.sell(data=data, size=size, **kwargs) # If position is short (negative), buy to close if possize < 0: return self.buy(data=data, size=size, **kwargs) return None
[docs] def buy_bracket( self, data=None, size=None, price=None, plimit=None, exectype=Order.Limit, valid=None, tradeid=0, trailamount=None, trailpercent=None, oargs=None, stopprice=None, stopexec=Order.Stop, stopargs=None, limitprice=None, limitexec=Order.Limit, limitargs=None, **kwargs, ): """Create a bracket order group (buy order with stop-loss and take-profit). Creates a bracket order group consisting of: - A main **buy** order with the specified execution type (default: Limit) - A *low side* bracket **sell** stop-loss order - A *high side* bracket **sell** take-profit order Args: - ``data`` (default: ``None``): The data feed for the order. If None, uses the first data feed (self.data). - ``size`` (default: ``None``): Size for the order. If None, the sizer determines the size. The same size is applied to all three orders. - ``price`` (default: ``None``): Price for the main buy order. None is valid for Market and Close orders. - ``plimit`` (default: ``None``): Price limit for StopLimit orders. - ``trailamount`` (default: ``None``): Absolute trailing amount for StopTrail/StopTrailLimit orders. - ``trailpercent`` (default: ``None``): Percentage trailing amount for StopTrail/StopTrailLimit orders. - ``exectype`` (default: ``bt.Order.Limit``): Execution type for the main order. See buy() documentation for possible values. - ``valid`` (default: ``None``): Order validity period. See buy() documentation for possible values. - ``tradeid`` (default: ``0``): Trade ID for tracking overlapping trades. - ``oargs`` (default: ``{}``): Specific keyword arguments (dict) for the main side order. Applied before **kwargs. - ``**kwargs``: Additional keyword arguments applied to all three orders. See buy() documentation for possible values. - ``stopprice`` (default: ``None``): Specific price for the stop-loss order. - ``stopexec`` (default: ``bt.Order.Stop``): Execution type for the stop-loss order. - ``stopargs`` (default: ``{}``): Specific keyword arguments (dict) for the stop-loss order. - ``limitprice`` (default: ``None``): Specific price for the take-profit order. - ``limitexec`` (default: ``bt.Order.Limit``): Execution type for the take-profit order. - ``limitargs`` (default: ``{}``): Specific keyword arguments (dict) for the take-profit order. Returns: A list containing the three orders [main_order, stop_order, limit_order]. Suppressed orders are represented as None. Note: High/Low side orders can be suppressed by setting limitexec=None or stopexec=None. """ # Normalize mutable-default placeholders (B006); these dicts are only # read via kargs.update(...), never mutated, so None==empty is equivalent. oargs = {} if oargs is None else oargs stopargs = {} if stopargs is None else stopargs limitargs = {} if limitargs is None else limitargs # Build parameter dictionary kargs = { "size": size, "data": data, "price": price, "plimit": plimit, "exectype": exectype, "valid": valid, "tradeid": tradeid, "trailamount": trailamount, "trailpercent": trailpercent, } # Update with main side order specific arguments kargs.update(oargs) # Update with general keyword arguments kargs.update(kwargs) # Set transmit flag: only transmit if both stop and limit are None kargs["transmit"] = limitexec is None and stopexec is None # Create the main buy order o = self.buy(**kargs) # Create stop-loss order if stopexec is not None: # low side / stop kargs = { "data": data, "price": stopprice, "exectype": stopexec, "valid": valid, "tradeid": tradeid, } kargs.update(stopargs) kargs.update(kwargs) kargs["parent"] = o kargs["transmit"] = limitexec is None kargs["size"] = o.size ostop = self.sell(**kargs) else: ostop = None # Create take-profit order if limitexec is not None: # high side / limit kargs = { "data": data, "price": limitprice, "exectype": limitexec, "valid": valid, "tradeid": tradeid, } kargs.update(limitargs) kargs.update(kwargs) kargs["parent"] = o kargs["transmit"] = True kargs["size"] = o.size olimit = self.sell(**kargs) else: olimit = None return [o, ostop, olimit]
[docs] def sell_bracket( self, data=None, size=None, price=None, plimit=None, exectype=Order.Limit, valid=None, tradeid=0, trailamount=None, trailpercent=None, oargs=None, stopprice=None, stopexec=Order.Stop, stopargs=None, limitprice=None, limitexec=Order.Limit, limitargs=None, **kwargs, ): """Create a sell bracket order group (sell order with stop-loss and take-profit). Creates a bracket order group consisting of: - A main **sell** order with the specified execution type (default: Limit) - A *high side* bracket **buy** stop-loss order - A *low side* bracket **buy** take-profit order Args: See buy_bracket() for parameter documentation. Returns: A list containing the three orders [main_order, stop_order, limit_order]. Suppressed orders are represented as None. Note: High/Low side orders can be suppressed by setting limitexec=None or stopexec=None. """ # Normalize mutable-default placeholders (B006); read-only via update(). oargs = {} if oargs is None else oargs stopargs = {} if stopargs is None else stopargs limitargs = {} if limitargs is None else limitargs kargs = { "size": size, "data": data, "price": price, "plimit": plimit, "exectype": exectype, "valid": valid, "tradeid": tradeid, "trailamount": trailamount, "trailpercent": trailpercent, } kargs.update(oargs) kargs.update(kwargs) kargs["transmit"] = limitexec is None and stopexec is None o = self.sell(**kargs) if stopexec is not None: # high side / stop kargs = { "data": data, "price": stopprice, "exectype": stopexec, "valid": valid, "tradeid": tradeid, } kargs.update(stopargs) kargs.update(kwargs) kargs["parent"] = o kargs["transmit"] = limitexec is None # transmit if last kargs["size"] = o.size ostop = self.buy(**kargs) else: ostop = None if limitexec is not None: # low side / limit kargs = { "data": data, "price": limitprice, "exectype": limitexec, "valid": valid, "tradeid": tradeid, } kargs.update(limitargs) kargs.update(kwargs) kargs["parent"] = o kargs["transmit"] = True kargs["size"] = o.size olimit = self.buy(**kargs) else: olimit = None return [o, ostop, olimit]
[docs] def order_target_size(self, data=None, target=0, **kwargs) -> Optional[Order]: """Place an order to achieve a target position size. Rebalances the current position to reach the specified target size. Args: data: The data feed for the order. If None, uses the default data feed. target: Target position size. - If target > pos.size: buy (target - pos.size) - If target < pos.size: sell (pos.size - target) - If target == 0: close the entire position **kwargs: Additional keyword arguments passed to buy/sell. Returns: The generated order, or None if target == current position size. """ # Get the specific data feed if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: data = self.data # Get the current position size possize = self.getposition(data, self.broker).size # If target is 0 and position exists, close the position if not target and possize: return self.close(data=data, size=possize, **kwargs) # If target is greater than current position, buy to increase if target > possize: return self.buy(data=data, size=target - possize, **kwargs) # If target is less than current position, sell to decrease if target < possize: return self.sell(data=data, size=possize - target, **kwargs) return None # no execution target == possize
[docs] def order_target_value(self, data=None, target=0.0, price=None, **kwargs) -> Optional[Order]: """Place an order to achieve a target position value. Rebalances the position to reach the specified target value. Args: data: The data feed for the order. If None, uses the default data feed. target: Target position value in currency units. - If target is 0: close position - If target > value: buy to increase value - If target < value: sell to decrease value price: Price for size calculation. If None, uses data.close[0]. **kwargs: Additional keyword arguments passed to buy/sell. Returns: The generated order, or None if no order was issued. """ # Get the data feed if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: data = self.data # Get the current position size possize = self.getposition(data, self.broker).size # If target is 0 and position exists, close the position if not target and possize: # closing a position return self.close(data=data, size=possize, price=price, **kwargs) # Otherwise, rebalance to target value # Get the current value of this data value = self.broker.getvalue(datas=[data]) # Get commission info for size calculation comminfo = self.broker.getcommissioninfo(data) # Get price: use provided price or default to close price # Make sure a price is there price = price if price is not None else data.close[0] # If target value is greater than current value, buy if target > value: size = comminfo.getsize(price, target - value) return self.buy(data=data, size=size, price=price, **kwargs) # If target value is less than current value, sell if target < value: size = comminfo.getsize(price, value - target) return self.sell(data=data, size=size, price=price, **kwargs) return None # no execution size == possize
[docs] def order_target_percent(self, data=None, target=0.0, **kwargs) -> Optional[Order]: """Place an order to achieve a target percentage of portfolio value. Rebalances the position so its value equals the target percentage of the total portfolio value. Args: data: The data feed for the order. If None, uses the default data feed. target: Target percentage as a decimal (e.g., 0.05 for 5%). **kwargs: Additional keyword arguments passed to order_target_value. Returns: The generated order, or None if no order was issued. Example: With target=0.05 and portfolio value of 100: - Target value = 0.05 * 100 = 5 - Orders are placed through order_target_value Note: Position direction (long/short) is considered: - If target > value: buy if pos.size >= 0, sell if pos.size < 0 - If target < value: sell if pos.size >= 0, buy if pos.size < 0 """ # Get the data feed if isinstance(data, string_types): data = self.getdatabyname(data) elif data is None: data = self.data # Calculate target value based on portfolio value # Note: Getting position size here is not necessary # possize = self.getposition(data, self.broker).size target *= self.broker.getvalue() return self.order_target_value(data=data, target=target, **kwargs)
[docs] def getposition(self, data=None, broker=None, side=None, **kwargs): """Get the current position for a data feed. Args: data: The data feed to get position for. If None, uses the first data feed. broker: The broker to query. If None, uses the default broker. Returns: The current Position object. Note: A property ``position`` is also available as a shortcut. """ data = data if data is not None else self.datas[0] broker = broker or self.broker return broker.getposition(data, side=side, **kwargs)
# Property to access position for the default data feed position = property(getposition)
[docs] def getpositionbyname(self, name=None, broker=None, side=None, **kwargs): """Get the current position for a data feed by name. Args: name: Name of the data feed. If None, uses the first data feed. broker: The broker to query. If None, uses the default broker. Returns: The current Position object. Note: A property ``positionbyname`` is also available as a shortcut. """ data = self.datas[0] if not name else self.getdatabyname(name) broker = broker or self.broker return broker.getposition(data, side=side, **kwargs)
# Property to access position by name positionbyname = property(getpositionbyname)
[docs] def getpositions(self, broker=None): """Get all positions from the broker. Args: broker: The broker to query. If None, uses the default broker. Returns: Dictionary mapping data feeds to Position objects. Note: A property ``positions`` is also available as a shortcut. """ broker = broker or self.broker return broker.positions
# Property to access all positions positions = property(getpositions)
[docs] def getpositionsbyname(self, broker=None): """Get all positions from the broker indexed by data name. Args: broker: The broker to query. If None, uses the default broker. Returns: OrderedDict mapping data names to Position objects. Note: A property ``positionsbyname`` is also available as a shortcut. """ broker = broker or self.broker positions = broker.positions posbyname = collections.OrderedDict() for name, data in iteritems(self.env.datasbyname): posbyname[name] = positions[data] return posbyname
# Property to access positions by name positionsbyname = property(getpositionsbyname) def _addsizer(self, sizer, *args, **kwargs): """Add a sizer to the strategy. If sizer is None, uses FixedSize sizer. Otherwise instantiates the provided sizer class and sets it. Args: sizer: Sizer class or None *args: Positional arguments for sizer instantiation **kwargs: Keyword arguments for sizer instantiation """ if sizer is None: self.setsizer(FixedSize()) else: self.setsizer(sizer(*args, **kwargs))
[docs] def setsizer(self, sizer): """Set the sizer for automatic stake calculation. Args: sizer: The sizer instance to use Returns: The sizer instance """ self._sizer = sizer sizer.set(self, self.broker) return sizer
[docs] def getsizer(self): """Get the current sizer for automatic stake calculation. Returns: The current sizer instance Note: Also available as the ``sizer`` property. """ return self._sizer
sizer = property(getsizer, setsizer)
[docs] def getsizing(self, data=None, isbuy=True): """Get the order size from the sizer. Uses the configured sizer to calculate the appropriate stake size for the next order. Args: data: The data feed for the order. If None, uses the default data. isbuy: True for buy orders, False for sell orders. Returns: The calculated stake size. """ # Ensure sizer has broker reference if hasattr(self._sizer, "broker") and self._sizer.broker is None: self._sizer.set(self, self.broker) return self._sizer.getsizing(data, isbuy)
[docs] class SignalStrategy(Strategy): """A strategy subclass that automatically operates using signals. This strategy subclass responds to signal indicators to automatically enter and exit positions based on signal values. Signal values: - ``> 0`` indicates a long (buy) signal - ``< 0`` indicates a short (sell) signal There are five types of signals, broken into two groups: **Main Group**: - ``LONGSHORT``: Both long and short indications from this signal are taken. The strategy will go long or short based on the sign. - ``LONG``: - Positive (long) indications: Go long - Negative (short) indications: Close long position - If ``LONGEXIT`` exists, it is used to exit longs - If ``SHORT`` signal exists and no ``LONGEXIT``, it will close longs before opening a short - ``SHORT``: - Negative (short) indications: Go short - Positive (long) indications: Close short position - If ``SHORTEXIT`` exists, it is used to exit shorts - If ``LONG`` signal exists and no ``SHORTEXIT``, it will close shorts before opening a long **Exit Group**: These signals override others to provide explicit exit criteria: - ``LONGEXIT``: Negative indications are taken to exit long positions - ``SHORTEXIT``: Positive indications are taken to exit short positions **Order Issuing** Orders are placed with Market execution type and Good-Until-Canceled validity. Params: - ``signals`` (default: ``[]``): A list/tuple of lists/tuples for signal instantiation and type allocation. This parameter is typically managed through ``cerebro.add_signal``. - ``_accumulate`` (default: ``False``): Allow entering the market even if already in a position (accumulate positions). - ``_concurrent`` (default: ``False``): Allow issuing orders even when orders are already pending execution. - ``_data`` (default: ``None``): If multiple datas are present in the system which is the target for orders. This can be - ``None``: The first data in the system will be used - An ``int``: indicating the data that was inserted at that position - An ``str``: name given to the data when creating it (parameter ``name``) or when adding it cerebro with ``cerebro.adddata(..., name=)`` - A ``data`` instance """ # Parameters for signal strategy params: tuple = ( ("signals", []), ("_accumulate", False), ("_concurrent", False), ("_data", None), )
[docs] def __new__(cls, *args, **kwargs): """Override __new__ to handle next method remapping that was done in MetaSigStrategy""" # Handle next method remapping like the old MetaSigStrategy.__new__ did if hasattr(cls, "next") and not hasattr(cls, "_next_custom"): cls._next_custom = cls.next # Create the instance instance = super().__new__(cls, *args, **kwargs) # Set the next method to _next_catch (from MetaSigStrategy) instance.next = instance._next_catch return instance
def __init__(self, *args, **kwargs): """Initialize the signal strategy with functionality from MetaSigStrategy methods""" # Handle the functionality that was in MetaSigStrategy.dopreinit self._signals = collections.defaultdict(list) # Set the data target (from MetaSigStrategy.dopreinit) _data = getattr(self.p, "_data", None) if _data is None: self._dtarget = self.data0 elif isinstance(_data, integer_types): self._dtarget = self.datas[_data] elif isinstance(_data, string_types): self._dtarget = self.getdatabyname(_data) elif isinstance(_data, LineRoot): self._dtarget = _data else: self._dtarget = self.data0 # Filter out strategy parameter kwargs to prevent them from reaching parent __init__ filtered_kwargs = kwargs.copy() if hasattr(self.__class__, "_params") and self.__class__._params is not None: params_cls = self.__class__._params param_names = set() # Get all parameter names from the class if hasattr(params_cls, "_getpairs"): param_names.update(params_cls._getpairs().keys()) elif hasattr(params_cls, "_gettuple"): param_names.update(key for key, value in params_cls._gettuple()) # Remove strategy parameter kwargs filtered_kwargs = {k: v for k, v in kwargs.items() if k not in param_names} # Call parent initialization with filtered kwargs # Don't pass *args to avoid object.__init__() error, consistent with Strategy.__init__ fix if filtered_kwargs: super().__init__(**filtered_kwargs) else: super().__init__() # Handle the functionality that was in MetaSigStrategy.dopostinit # Add signals from params # CRITICAL FIX: Pass self._dtarget as data source for signal indicators # and register them with the strategy so they get processed for sigtype, sigcls, sigargs, sigkwargs in self.p.signals: sig_indicator = sigcls(self._dtarget, *sigargs, **sigkwargs) self._signals[sigtype].append(sig_indicator) # CRITICAL FIX: Register signal indicator with strategy's _lineiterators # so its once()/next() methods get called during processing if hasattr(sig_indicator, "_ltype"): ltype = sig_indicator._ltype if sig_indicator not in self._lineiterators[ltype]: self._lineiterators[ltype].append(sig_indicator) sig_indicator._owner = self # Record types of signals self._longshort = bool(self._signals[SIGNAL_LONGSHORT]) self._long = bool(self._signals[SIGNAL_LONG]) self._short = bool(self._signals[SIGNAL_SHORT]) self._longexit = bool(self._signals[SIGNAL_LONGEXIT]) self._shortexit = bool(self._signals[SIGNAL_SHORTEXIT]) def _start(self): """Start the signal strategy and initialize the order sentinel.""" self._sentinel = None # sentinel for order concurrency super()._start()
[docs] def signal_add(self, sigtype, signal): """Add a signal indicator to the strategy. Args: sigtype: Type of signal (e.g., SIGNAL_LONG, SIGNAL_SHORT) signal: The signal indicator instance """ self._signals[sigtype].append(signal)
def _notify(self, qorders=None, qtrades=None): """Process notifications and reset sentinel when order completes. Args: qorders: Quick notify orders qtrades: Quick notify trades """ if qorders is None: qorders = [] if qtrades is None: qtrades = [] # Nullify the sentinel if done procorders = qorders or self._orderspending if self._sentinel is not None: for order in procorders: if order == self._sentinel and not order.alive(): self._sentinel = None break super()._notify(qorders=qorders, qtrades=qtrades) def _next_catch(self): """Catch method that routes to signal processing and custom next.""" self._next_signal() if hasattr(self, "_next_custom"): self._next_custom() @staticmethod def _all_pos(sig, nosig): """True if every value in ``sig`` (or ``nosig`` when empty) is > 0.""" return all(x[0] > 0.0 for x in sig or nosig) @staticmethod def _all_neg(sig, nosig): """True if every value in ``sig`` (or ``nosig`` when empty) is < 0.""" return all(x[0] < 0.0 for x in sig or nosig) @staticmethod def _all_any(sig, nosig): """True if every value in ``sig`` (or ``nosig`` when empty) is truthy.""" return all(x[0] for x in sig or nosig) def _evaluate_signals(self): """Evaluate all signal collections into entry/exit/reversal flags. Pure helper (no order side effects) extracted from _next_signal for readability. Returns a tuple of booleans consumed by the position decision logic: (ls_long, ls_short, l_enter, s_enter, l_exit, s_exit, l_rev, s_rev, l_leave, s_leave) """ # Get signal collections sigs = self._signals # Default no-signal value nosig = [[0.0]] pos = self._all_pos neg = self._all_neg anyv = self._all_any # Calculate current status of the signals # If SIGNAL_LONGSHORT is empty, loop through nosig ls_long = pos(sigs[SIGNAL_LONGSHORT], nosig) ls_short = neg(sigs[SIGNAL_LONGSHORT], nosig) # Long entry: direct (>0), inverted (<0) or any (truthy) l_enter = ( pos(sigs[SIGNAL_LONG], nosig) or neg(sigs[SIGNAL_LONG_INV], nosig) or anyv(sigs[SIGNAL_LONG_ANY], nosig) ) # Short entry: direct (<0), inverted (>0) or any (truthy) s_enter = ( neg(sigs[SIGNAL_SHORT], nosig) or pos(sigs[SIGNAL_SHORT_INV], nosig) or anyv(sigs[SIGNAL_SHORT_ANY], nosig) ) # Long exit: direct (<0), inverted (>0) or any (truthy) l_exit = ( neg(sigs[SIGNAL_LONGEXIT], nosig) or pos(sigs[SIGNAL_LONGEXIT_INV], nosig) or anyv(sigs[SIGNAL_LONGEXIT_ANY], nosig) ) # Short exit: direct (>0), inverted (<0) or any (truthy) s_exit = ( pos(sigs[SIGNAL_SHORTEXIT], nosig) or neg(sigs[SIGNAL_SHORTEXIT_INV], nosig) or anyv(sigs[SIGNAL_SHORTEXIT_ANY], nosig) ) # Use opposite signals to start reversal (by closing) # but only if no "xxxExit" exists # Long reversal: no long exit and short entry signal l_rev = not self._longexit and s_enter # Short reversal: no short exit and long entry signal s_rev = not self._shortexit and l_enter # Opposite of individual long and short (leave = exit on opposite signal) # Long leave: direct (<0), inverted (>0) or any (truthy) l_leave = ( neg(sigs[SIGNAL_LONG], nosig) or pos(sigs[SIGNAL_LONG_INV], nosig) or anyv(sigs[SIGNAL_LONG_ANY], nosig) ) # Short leave: direct (>0), inverted (<0) or any (truthy) s_leave = ( pos(sigs[SIGNAL_SHORT], nosig) or neg(sigs[SIGNAL_SHORT_INV], nosig) or anyv(sigs[SIGNAL_SHORT_ANY], nosig) ) # Invalidate long leave if longexit signals are available # If longexit exists, disable l_leave; otherwise keep l_leave l_leave = not self._longexit and l_leave # Invalidate short leave if shortexit signals are available # If shortexit exists, disable s_leave; otherwise keep s_leave s_leave = not self._shortexit and s_leave return ( ls_long, ls_short, l_enter, s_enter, l_exit, s_exit, l_rev, s_rev, l_leave, s_leave, ) def _next_signal(self): """Process signals and generate orders based on signal values. Evaluates all signal types and generates buy/sell orders based on: - Current position status - Signal values (positive/negative) - Accumulation and concurrency settings """ # If concurrent orders are disabled and an order is active, return if self._sentinel is not None and not self.p._concurrent: return # order active and more than 1 not allowed # Evaluate all signal collections into decision flags ( ls_long, ls_short, l_enter, s_enter, l_exit, s_exit, l_rev, s_rev, l_leave, s_leave, ) = self._evaluate_signals() # Take size and start logic # Get current position size size = self.getposition(self._dtarget).size # If no position if not size: # Enter new position based on signals if ls_long or l_enter: self._sentinel = self.buy(self._dtarget) elif ls_short or s_enter: self._sentinel = self.sell(self._dtarget) # If current position is long (positive) elif size > 0: # current long position if ls_short or l_exit or l_rev or l_leave: # closing position - not relevant for concurrency self.close(self._dtarget) if ls_short or l_rev: self._sentinel = self.sell(self._dtarget) if ls_long or l_enter: if self.p._accumulate: self._sentinel = self.buy(self._dtarget) # If current position is short (negative) elif size < 0: # current short position if ls_long or s_exit or s_rev or s_leave: # closing position - not relevant for concurrency self.close(self._dtarget) if ls_long or s_rev: self._sentinel = self.buy(self._dtarget) if ls_short or s_enter: if self.p._accumulate: self._sentinel = self.sell(self._dtarget)
[docs] class BtApiStrategy(Strategy): """A Strategy subclass with built-in logging capabilities. This strategy class extends the base Strategy class with automatic logger initialization using the SpdLogManager. It provides a default log() method for logging messages and custom notification handling. Attributes: logger: The configured logger instance from SpdLogManager. Params: log_file_name: Optional custom log file name. If not provided, defaults to "{ClassName}.log". Example: class MyStrategy(bt.BtApiStrategy): params = (('log_file_name', 'my_strategy.log'),) def next(self): self.log(f'Close price: {self.data.close[0]:.2f}') """ def __init__(self): """Initialize the strategy with a logger instance.""" self.logger = self.init_logger(self.p.get("log_file_name", None))
[docs] def init_logger(self, log_file_name=None): """Initialize and return a logger instance. Creates a logger using SpdLogManager with the specified or default log file name. Args: log_file_name: Optional custom log file name. If None, uses "{ClassName}.log" as the default. Returns: A configured logger instance. """ if log_file_name is None: logger = SpdLogManager( file_name=self.__class__.__name__ + ".log", logger_name="strategy", print_info=True ).create_logger() else: logger = SpdLogManager( file_name=log_file_name, logger_name="strategy", print_info=True ).create_logger() return logger
[docs] def log(self, txt): """Log a message at INFO level. Args: txt: The message text to log. """ self.logger.info(txt)
def _addnotification(self, data, quicknotify=True): """Process notifications for orders and trades with logging. This method extends the base notification handling to route notifications to the appropriate handler methods. Args: data: The notification data, which can be an order or trade. quicknotify: If True, immediately process notification without queueing. """ if data.data_type == "order": self.notify_order(data) if data.data_type == "trade": self.notify_trade(data)