Source code for backtrader.cerebro

#!/usr/bin/env python
"""Cerebro - The main engine of the Backtrader framework.

This module contains the Cerebro class, which is the central orchestrator for
backtesting and live trading operations. Cerebro manages data feeds, strategies,
brokers, analyzers, observers, and all other components of the trading system.

Key Features:
    - Data feed management and synchronization
    - Strategy instantiation and execution
    - Broker integration for order execution
    - Multi-core optimization support
    - Live trading and backtesting modes
    - Plotting and analysis capabilities

Example:
    Basic backtest setup::

        import backtrader as bt

        cerebro = bt.Cerebro()
        data = bt.feeds.GenericCSVData(dataname='data.csv')
        cerebro.adddata(data)
        cerebro.addstrategy(MyStrategy)
        cerebro.broker.setcash(100000)
        results = cerebro.run()
        cerebro.plot()

Classes:
    OptReturn: Lightweight result object for optimization runs.
    Cerebro: Main backtesting/trading engine.
"""

import collections
import datetime
import itertools
import multiprocessing
from datetime import timezone

from . import errors, feeds, indicator, linebuffer, observers
from .brokers import BackBroker
from .dataseries import TimeFrame
from .metabase import OwnerContext
from .parameters import ParameterDescriptor, ParameterizedBase
from .strategy import SignalStrategy, Strategy
from .timer import Timer
from .tradingcal import PandasMarketCalendar, TradingCalendarBase
from .utils import OrderedDict, date2num, num2date, tzparse
from .utils.log_message import get_logger
from .utils.py3 import integer_types, map, range, string_types, zip
from .writer import WriterFile

logger = get_logger(__name__)

# Python 3 always provides collections.abc (the only supported baseline).
collectionsAbc = collections.abc  # collections.Iterable -> collections.abc.Iterable

# Python 3.11+ has datetime.UTC, earlier versions use timezone.utc
UTC = timezone.utc


[docs] class OptReturn: """Lightweight result container for optimization runs. This class is defined at module level to make it picklable for multiprocessing. It stores only essential information from strategy runs during optimization to reduce memory usage. Attributes: p: Alias for params. params: Strategy parameters used in this optimization run. analyzers: Analyzer results (if returned during optimization). Note: Additional attributes may be set dynamically via kwargs. """
[docs] def __init__(self, params, **kwargs): """Initialize the OptReturn object. Args: params: Strategy parameters used in this optimization run. **kwargs: Additional keyword arguments to set as attributes. """ self.p = self.params = params for k, v in kwargs.items(): setattr(self, k, v)
[docs] class Cerebro(ParameterizedBase): """Params: - ``preload`` (default: ``True``) Whether to preload the different ``data feeds`` passed to cerebro for the Strategies Note: When True (default), data is loaded into memory before backtesting, which uses more memory but significantly improves execution speed. - ``runonce`` (default: ``True``) Run `Indicators` in vectorized mode to speed up the entire system. Strategies and Observers will always be run on an event-based basis Note: When True, indicators are calculated using vectorized operations for better performance. Strategies and observers still run event-by-event. - ``live`` (default: ``False``) If no data has reported itself as *live* (via the data's ``islive`` method but the end user still wants to run in ``live`` mode, this parameter can be set to true This will simultaneously deactivate ``preload`` and ``runonce``. It will have no effect on memory saving schemes. Note: Setting to True forces live mode behavior, disabling preload and runonce optimizations, which slows down backtesting. - ``maxcpus`` (default: None -> all available cores) How many cores to use simultaneously for optimization Note: Set to number of CPU cores minus 1 to avoid system overload. Use None (default) to use all available cores. - ``stdstats`` (default: ``True``) If True, default Observers will be added: Broker (Cash and Value), Trades and BuySell Note: These observers are used for plotting. Set to False if not needed. - ``oldbuysell`` (default: ``False``) If ``stdstats`` is ``True`` and observers are getting automatically added, this switch controls the main behavior of the ``BuySell`` observer - ``False``: use the modern behavior in which the buy / sell signals are plotted below / above the low / high prices respectively to avoid cluttering the plot - ``True``: use the deprecated behavior in which the buy / sell signals are plotted where the average price of the order executions for the given moment in time is. This will, of course, be on top of an OHLC bar or on a Line on Cloe bar, difficult the recognition of the plot. Note: False (modern) plots signals outside the price bars for clarity. True (old) plots signals at execution price, overlapping with bars. - ``oldtrades`` (default: ``False``) If ``stdstats`` is ``True`` and observers are getting automatically added, this switch controls the main behavior of the ``Trades`` observer - ``False``: use the modern behavior in which trades for all datas are plotted with different markers - ``True``: use the old Trades observer which plots the trades with the same markers, differentiating only if they are positive or negative Note: False uses different markers for different trades. True uses same markers, only distinguishing positive/negative. - ``exactbars`` (default: ``False``) With the default value, each and every value stored in a line is kept in memory Possible values: - ``True`` or ``1``: all "lines" objects reduce memory usage to the automatically calculated minimum period. If a Simple Moving Average has a period of 30, the underlying data will have always a running buffer of 30 bars to allow the calculation of the Simple Moving Average - This setting will deactivate ``preload`` and ``runonce`` - Using this setting also deactivates **plotting** - ``-1``: datafeeds and indicators/operations at strategy level will keep all data in memory. For example: a ``RSI`` internally uses the indicator ``UpDay`` to make calculations. This subindicator will not keep all data in memory - This allows keeping ``plotting`` and ``preloading`` active. - ``runonce`` will be deactivated - ``-2``: data feeds and indicators kept as attributes of the strategy will keep all points in memory. For example: a ``RSI`` internally uses the indicator ``UpDay`` to make calculations. This subindicator will not keep all data in memory If in the ``__init__`` something like ``a = self.data.close - self.data.high`` is defined, then ``a`` will not keep all data in memory - This allows keeping ``plotting`` and ``preloading`` active. - ``runonce`` will be deactivated Note on exactbars values: - True/1: Minimum memory, disables preload/runonce/plotting - -1: Keeps data/indicators but not sub-indicator internals, disables runonce - -2: Keeps strategy-level data/indicators, sub-indicators not using self are discarded - ``objcache`` (default: ``False``) Experimental option to implement a cache of lines objects and reduce the amount of them. Example from UltimateOscillator: bp = self.data.close - TrueLow(self.data) tr = TrueRange(self.data) # -> creates another TrueLow(self.data) If this is `True`, the second ``TrueLow(self.data)`` inside ``TrueRange`` matches the signature of the one in the ``bp`` calculation. It will be reused. Corner cases may happen in which this drives a line object off its minimum period and breaks things, and it is therefore disabled. Note: When True, identical indicator calculations are cached and reused to reduce computation. Disabled by default due to edge cases. - ``writer`` (default: ``False``) If set to ``True`` a default WriterFile will be created which will print to stdout. It will be added to the strategy (in addition to any other writers added by the user code) Note: Outputs trading information to stdout. Custom logging in strategy is usually preferred for more control. - ``tradehistory`` (default: ``False``) If set to ``True``, it will activate update event logging in each trade for all strategies. This can also be achieved on a per-strategy basis with the strategy method ``set_tradehistory`` Note: Enables trade update logging for all strategies. Can also be enabled per-strategy using set_tradehistory method. - ``optdatas`` (default: ``True``) If ``True`` and optimizing (and the system can ``preload`` and use ``runonce``, data preloading will be done only once in the main process to save time and resources. The tests show an approximate ``20%`` speed-up moving from a sample execution in ``83`` seconds to ``66`` Note: When True with preload/runonce, data is preloaded once in the main process and shared across optimization workers (~20% speedup). - ``optreturn`` (default: ``True``) If `True`, the optimization results will not be full ``Strategy`` objects (and all *datas*, *indicators*, *observers* ...) but object with the following attributes (same as in ``Strategy``): - ``params`` (or ``p``) the strategy had for the execution - ``analyzers`` the strategy has executed On most occasions, only the *analyzers* and with which *params* are the things needed to evaluate the performance of a strategy. If detailed analysis of the generated values for (for example) *indicators* is needed, turn this off The tests show a 13% - 15% improvement in execution time. Combined with `optdatas` the total gain increases to a total speed-up of `32%` in an optimization run. Note: Returns only params and analyzers during optimization, discarding data/indicators/observers for ~15% speedup (32% combined with optdatas). - ``oldsync`` (default: ``False``) Starting with release 1.9.0.99, the synchronization of multiple datas (same or different timeframes) has been changed to allow datas of different lengths. If the old behavior with data0 as the master of the system is wished, set this parameter to true Note: False allows data feeds of different lengths. True uses data0 as master (legacy behavior). - ``tz`` (default: ``None``) Adds a global timezone for strategies. The argument ``tz`` can be - ``None``: in this case the datetime displayed by strategies will be in UTC, which has always been the standard behavior - ``pytz`` instance. It will be used as such to convert UTC times to the chosen timezone - ``string``. Instantiating a ``pytz`` instance will be attempted. - ``integer``. Use, for the strategy, the same timezone as the corresponding ``data`` in the ``self.datas`` iterable (``0`` would use the timezone from ``data0``) Note: None=UTC, pytz instance converts from UTC, string creates pytz, integer uses timezone from corresponding data feed index. - ``cheat_on_open`` (default: ``False``) The ``next_open`` method of strategies will be called. This happens before ``next`` and before the broker has had a chance to evaluate orders. The indicators have not yet been recalculated. This allows issuing an order which takes into account the indicators of the previous day but uses the ``open`` price for stake calculations For cheat_on_open order execution, it is also necessary to make the call ``cerebro.broker.set_coo(True)`` or instantiate a broker with ``BackBroker(coo=True)`` (where *coo* stands for cheat-on-open) or set the ``broker_coo`` parameter to ``True``. Cerebro will do it automatically unless disabled below. Note: Enables using next bar's open price for position sizing. Useful for precise capital allocation. Requires broker_coo=True. - ``broker_coo`` (default: ``True``) This will automatically invoke the ``set_coo`` method of the broker with ``True`` to activate ``cheat_on_open`` execution. Will only do it if ``cheat_on_open`` is also ``True`` Note: Works together with cheat_on_open parameter. - ``quicknotify`` (default: ``False``) Broker notifications are delivered right before the delivery of the *next* prices. For backtesting, this has no implications, but with live brokers, a notification can take place long before the bar is delivered. When set to ``True`` notifications will be delivered as soon as possible (see ``qcheck`` in live feeds) Set to ``False`` for compatibility. May be changed to ``True`` Note: False delays notifications until next bar. True sends immediately. Mainly relevant for live trading. """ # Parameter descriptors using new system preload = ParameterDescriptor( default=True, type_=bool, doc="Whether to preload the different data feeds" ) runonce = ParameterDescriptor(default=True, type_=bool, doc="Run Indicators in vectorized mode") maxcpus = ParameterDescriptor(default=None, doc="How many cores to use for optimization") stdstats = ParameterDescriptor(default=True, type_=bool, doc="Add default Observers") oldbuysell = ParameterDescriptor( default=False, type_=bool, doc="Use old BuySell observer behavior" ) oldtrades = ParameterDescriptor( default=False, type_=bool, doc="Use old Trades observer behavior" ) lookahead = ParameterDescriptor(default=0, type_=int, doc="Lookahead parameter") exactbars = ParameterDescriptor(default=False, doc="Memory usage control for lines objects") optdatas = ParameterDescriptor( default=True, type_=bool, doc="Optimize data preloading during optimization" ) optreturn = ParameterDescriptor( default=True, type_=bool, doc="Return simplified objects during optimization" ) objcache = ParameterDescriptor( default=False, type_=bool, doc="Cache lines objects to reduce memory" ) live = ParameterDescriptor(default=False, type_=bool, doc="Run in live mode") writer = ParameterDescriptor(default=False, type_=bool, doc="Add a default WriterFile") tradehistory = ParameterDescriptor( default=False, type_=bool, doc="Activate trade history logging" ) oldsync = ParameterDescriptor(default=False, type_=bool, doc="Use old synchronization behavior") tz = ParameterDescriptor(default=None, doc="Global timezone for strategies") cheat_on_open = ParameterDescriptor( default=False, type_=bool, doc="Enable cheat-on-open execution" ) broker_coo = ParameterDescriptor( default=True, type_=bool, doc="Auto-activate broker cheat-on-open" ) quicknotify = ParameterDescriptor( default=False, type_=bool, doc="Deliver broker notifications quickly" )
[docs] def __init__(self, **kwargs): """Initialize Cerebro with optional parameter overrides. Args: **kwargs: Parameter overrides (preload, runonce, maxcpus, etc.) """ super().__init__(**kwargs) # Internal state flags self._timerscheat = None self._timers = None self.runningstrats: list = [] self.runstrats = None self.writers_csv = None self.runwriters = None self._dopreload = None self._dorunonce = None self._exactbars = 0 self._event_stop = None self._dolive = False # Live trading mode flag self._doreplay = False # Data replay mode flag self._dooptimize = False # Optimization mode flag # Component containers self.stores = [] # Data stores self.feeds = [] # Data feeds self.datas = [] # Data objects self.datasbyname = collections.OrderedDict() # Data lookup by name self.strats = [] # Strategy classes/instances self.optcbs = [] # Optimization callbacks self.observers = [] # Observer classes self.analyzers = [] # Analyzer classes self.indicators = [] # Indicator classes self.sizers = {} # Position sizers self.writers = [] # Output writers self.storecbs = [] # Store callbacks self.datacbs = [] # Data callbacks self.signals = [] # Signal definitions # Signal strategy configuration self._signal_strat = (None, None, None) self._signal_concurrent = False # Allow concurrent signals self._signal_accumulate = False # Allow accumulating positions # Internal counters and references self._dataid = itertools.count(1) # Data ID counter self._broker = BackBroker() # Default broker self._broker.cerebro = self # Back-reference to cerebro self._tradingcal = None # Trading calendar self._pretimers = [] # Pre-run timers self._ohistory = [] # Order history self._fhistory = None # Fund history # Override parameters from kwargs pkeys = self.params._getkeys() for key, val in kwargs.items(): if key in pkeys: setattr(self.params, key, val)
[docs] @staticmethod def iterize(iterable): """Convert each element in iterable to be iterable itself. Args: iterable: Input iterable whose elements may not be iterable. Returns: list: New list where each element is guaranteed to be iterable. """ niterable = [] for elem in iterable: if isinstance(elem, string_types) or not isinstance(elem, collectionsAbc.Iterable): elem = (elem,) niterable.append(elem) return niterable
[docs] def set_fund_history(self, fund): """ Add a history of orders to be directly executed in the broker for performance evaluation - ``fund``: is an iterable (ex: list, tuple, iterator, generator) in which each element will be also iterable (with length) with the following sub-elements (two formats are possible) ``[datetime, share_value, net asset value]`` **Note**: it must be sorted (or produce sorted elements) by datetime ascending where: - ``datetime`` is a python ``date/datetime`` instance or a string with format YYYY-MM-DD[THH:MM:SS[.us]] where the elements in brackets are optional - ``share_value`` is a float/integer - ``net_asset_value`` is a float/integer """ self._fhistory = fund
[docs] def add_order_history(self, orders, notify=True): """ Add a history of orders to be directly executed in the broker for performance evaluation - ``orders``: is an iterable (ex: list, tuple, iterator, generator) in which each element will be also iterable (with length) with the following sub-elements (two formats are possible) ``[datetime, size, price]`` or ``[datetime, size, price, data]`` **Note**: it must be sorted (or produce sorted elements) by datetime ascending where: - ``datetime`` is a python ``date/datetime`` instance or a string with format YYYY-MM-DD[THH:MM:SS[.us]] where the elements in brackets are optional - ``size`` is an integer (positive to *buy*, negative to *sell*) - ``price`` is a float/integer - ``data`` if present can take any of the following values - *None* - The 1st data feed will be used as target - *integer* - The data with that index (insertion order in **Cerebro**) will be used - *string* - a data with that name, assigned for example with ``cerebro.addata(data, name=value)``, will be the target - ``notify`` (default: *True*) If ``True``, the first strategy inserted in the system will be notified of the artificial orders created following the information from each order in ``orders`` **Note**: Implicit in the description is the need to add a data feed which is the target of the orders.This is, for example, needed by analyzers which track, for example, the returns """ self._ohistory.append((orders, notify))
[docs] def notify_timer(self, timer, when, *args, **kwargs): """Receives a timer notification where ``timer`` is the timer that was returned by ``add_timer``, and ``when`` is the calling time. ``args`` and ``kwargs`` are any additional arguments passed to ``add_timer`` The actual `when` time can be later, but the system may have not been able to call the timer before. This value is the timer value and no the system time. """
def _add_timer( self, owner, when, offset=datetime.timedelta(), repeat=datetime.timedelta(), weekdays=None, weekcarry=False, monthdays=None, monthcarry=True, allow=None, tzdata=None, strats=False, cheat=False, *args, **kwargs, ): """Internal method to really create the timer (not started yet) which can be called by cerebro instances or other objects which can access cerebro""" # Normalize mutable-default placeholders (B006): Timer treats None as # "all days", identical to the previous empty-list default. weekdays = [] if weekdays is None else weekdays monthdays = [] if monthdays is None else monthdays timer = Timer( tid=len(self._pretimers), owner=owner, strats=strats, when=when, offset=offset, repeat=repeat, weekdays=weekdays, weekcarry=weekcarry, monthdays=monthdays, monthcarry=monthcarry, allow=allow, tzdata=tzdata, cheat=cheat, *args, **kwargs, ) self._pretimers.append(timer) return timer
[docs] def add_timer( self, when, offset=datetime.timedelta(), repeat=datetime.timedelta(), weekdays=None, weekcarry=False, monthdays=None, monthcarry=True, allow=None, tzdata=None, strats=False, cheat=False, *args, **kwargs, ): """ Schedules a timer to invoke ``notify_timer`` Arguments: - ``when``: can be - ``datetime.time`` instance (see below ``tzdata``) - ``bt.timer.SESSION_START`` to reference a session start - ``bt.timer.SESSION_END`` to reference a session end - ``offset`` which must be a ``datetime.timedelta`` instance Used to offset the value ``when``. It has a meaningful use in combination with ``SESSION_START`` and ``SESSION_END``, to indicate things like a timer being called ``15 minutes`` after the session starts. - ``repeat`` which must be a ``datetime.timedelta`` instance Indicates if after a first call, further calls will be scheduled within the same session at the scheduled `repeat` delta Once the timer goes over the end of the session, it is reset to the original value for ``when`` - ``weekdays``: a **sorted** iterable with integers indicating on which days (iso codes, Monday is 1, Sunday is 7) the timers can be actually invoked If not specified, the timer will be active on all days - ``weekcarry`` (default: ``False``). If ``True`` and the weekday was not seen (ex: trading holiday), the timer will be executed on the next day (even if in a new week) - ``monthdays``: a **sorted** iterable with integers indicating on which days of the month a timer has to be executed. For example, always on day *15* of the month If not specified, the timer will be active on all days - ``monthcarry`` (default: ``True``). If the day was not seen (weekend, trading holiday), the timer will be executed on the next available day. - ``allow`` (default: ``None``). A callback which receives a `datetime.date`` instance and returns ``True`` if the date is allowed for timers or else returns ``False`` - ``tzdata`` which can be either ``None`` (default), a ``pytz`` instance or a ``data feed`` instance. ``None``: ``when`` is interpreted at face value (which translates to handling it as if it is UTC even if it's not) ``pytz`` instance: ``when`` will be interpreted as being specified in the local time specified by the timezone instance. ``data feed`` instance: ``when`` will be interpreted as being specified in the local time specified by the ``tz`` parameter of the data feed instance. **Note**: If ``when`` is either ``SESSION_START`` or ``SESSION_END`` and ``tzdata`` is ``None``, the first *data feed* in the system (aka ``self.data0``) will be used as the reference to find out the session times. - ``strats`` (default: ``False``) call also the ``notify_timer`` of strategies - ``cheat`` (default ``False``) if ``True`` the timer will be called before the broker has a chance to evaluate the orders. This opens the chance to issue orders based on opening price, for example, right before the session starts - ``*args``: any extra args will be passed to ``notify_timer`` - ``**kwargs``: any extra kwargs will be passed to ``notify_timer`` Return Value: - The created timer """ # NOTE: *args (extra notify_timer args) are forwarded positionally after # the named timer kwargs. mypy flags a potential owner/when collision, # but _add_timer collects these into its own *args; ignore the false # positive rather than reorder (which would change call semantics). return self._add_timer( # type: ignore[misc] owner=self, when=when, offset=offset, repeat=repeat, weekdays=weekdays, weekcarry=weekcarry, monthdays=monthdays, monthcarry=monthcarry, allow=allow, tzdata=tzdata, strats=strats, cheat=cheat, *args, **kwargs, )
[docs] def addtz(self, tz): """This can also be done with the parameter ``tz`` Adds a global timezone for strategies. The argument ``tz`` can be - ``None``: in this case the datetime displayed by strategies will be in UTC, which has always been the standard behavior - ``pytz`` instance. It will be used as such to convert UTC times to the chosen timezone - ``string``. Instantiating a ``pytz`` instance will be attempted. - ``integer``. Use, for the strategy, the same timezone as the corresponding ``data`` in the ``self.datas`` iterable (``0`` would use the timezone from ``data0``) """ self.p.tz = tz
[docs] def addcalendar(self, cal): """Adds a global trading calendar to the system. Individual data feeds may have separate calendars which override the global one ``cal`` can be an instance of ``TradingCalendar`` a string or an instance of ``pandas_market_calendars``. A string will be instantiated as a ``PandasMarketCalendar`` (which needs the module ``pandas_market_calendar`` installed in the system). If a subclass of `TradingCalendarBase` is passed (not an instance), it will be instantiated """ # Handle string or pandas calendar with valid_days attribute if isinstance(cal, string_types) or hasattr(cal, "valid_days"): cal = PandasMarketCalendar(calendar=cal) # Handle TradingCalendarBase subclass or instance else: try: if issubclass(cal, TradingCalendarBase): cal = cal() except TypeError: # already an instance pass self._tradingcal = cal
[docs] def add_signal(self, sigtype, sigcls, *sigargs, **sigkwargs): """Add a signal to be used with SignalStrategy.""" self.signals.append((sigtype, sigcls, sigargs, sigkwargs))
[docs] def signal_strategy(self, stratcls, *args, **kwargs): """Set a SignalStrategy subclass to receive signals.""" self._signal_strat = (stratcls, args, kwargs)
[docs] def signal_concurrent(self, onoff): """Allow concurrent orders when signals are pending.""" self._signal_concurrent = onoff
[docs] def signal_accumulate(self, onoff): """If signals are added to the system and the `accumulate` value is set to True, entering the market when already in the market, will be allowed to increase a position""" self._signal_accumulate = onoff
[docs] def addstore(self, store): """Add a Store instance to the system.""" if store not in self.stores: self.stores.append(store)
def _maybe_add_store(self, candidate): """Register a store exposed by a broker or data feed.""" store = getattr(candidate, "store", None) or getattr(candidate, "_store", None) if store is not None: self.addstore(store)
[docs] def addwriter(self, wrtcls, *args, **kwargs): """Adds an ``Writer`` class to the mix. Instantiation will be done at ``run`` time in cerebro""" self.writers.append((wrtcls, args, kwargs))
[docs] def addsizer(self, sizercls, *args, **kwargs): """Adds a ``Sizer`` class (and args) which is the default sizer for any strategy added to cerebro """ self.sizers[None] = (sizercls, args, kwargs)
[docs] def addsizer_byidx(self, idx, sizercls, *args, **kwargs): """Adds a ``Sizer`` class by idx. This idx is a reference compatible to the one returned by ``addstrategy``. Only the strategy referenced by ``idx`` will receive this size """ self.sizers[idx] = (sizercls, args, kwargs)
[docs] def addindicator(self, indcls, *args, **kwargs): """Add an Indicator class to be instantiated at run time.""" self.indicators.append((indcls, args, kwargs))
[docs] def addanalyzer(self, ancls: type, *args, **kwargs) -> None: """Add an Analyzer class to be instantiated at run time.""" self.analyzers.append((ancls, args, kwargs))
[docs] def addobserver(self, obscls: type, *args, **kwargs) -> None: """ Adds an ``Observer`` class to the mix. Instantiation will be done at ``run`` time """ self.observers.append((False, obscls, args, kwargs))
[docs] def addobservermulti(self, obscls, *args, **kwargs): """ It will be added once per "data" in the system. A use case is a buy/sell observer that observes individual data. A counter-example is the CashValue, which observes system-wide values """ self.observers.append((True, obscls, args, kwargs))
[docs] def addstorecb(self, callback): """Adds a callback to get messages which would be handled by the notify_store method The signature of the callback must support the following: - callback(msg, *args, *kwargs) The actual ``msg``, ``*args`` and ``**kwargs`` received are implementation defined (depend entirely on the *data/broker/store*) but in general one should expect them to be *printable* to allow for reception and experimentation. """ self.storecbs.append(callback)
def _notify_store(self, msg, *args, **kwargs): """Internal method to dispatch store notifications.""" for callback in self.storecbs: callback(msg, *args, **kwargs) self.notify_store(msg, *args, **kwargs)
[docs] def notify_store(self, msg, *args, **kwargs): """Receive store notifications in cerebro This method can be overridden in ``Cerebro`` subclasses The actual ``msg``, ``*args`` and ``**kwargs`` received are implementation defined (depend entirely on the *data/broker/store*) but in general one should expect them to be *printable* to allow for reception and experimentation. """
def _storenotify(self): """Process and dispatch store notifications to strategies.""" for store in self.stores: for notif in store.get_notifications(): msg, args, kwargs = notif self._notify_store(msg, *args, **kwargs) for strat in self.runningstrats: strat.notify_store(msg, *args, **kwargs) if hasattr(strat, "_notify_store_to_observers"): strat._notify_store_to_observers(msg, *args, **kwargs)
[docs] def adddatacb(self, callback): """Adds a callback to get messages which would be handled by the notify_data method The signature of the callback must support the following: - callback(data, status, *args, *kwargs) The actual ``*args`` and ``**kwargs`` received are implementation defined (depend entirely on the *data/broker/store*), but in general one should expect them to be *printable* to allow for reception and experimentation. """ self.datacbs.append(callback)
def _datanotify(self): """Process and dispatch data notifications to strategies.""" for data in self.datas: for notif in data.get_notifications(): status, args, kwargs = notif self._notify_data(data, status, *args, **kwargs) for strat in self.runningstrats: strat.notify_data(data, status, *args, **kwargs) if hasattr(strat, "_notify_data_to_observers"): strat._notify_data_to_observers(data, status, *args, **kwargs) def _notify_data(self, data, status, *args, **kwargs): """Internal method to dispatch data notifications.""" for callback in self.datacbs: callback(data, status, *args, **kwargs) self.notify_data(data, status, *args, **kwargs)
[docs] def notify_data(self, data, status, *args, **kwargs): """Receive data notifications in cerebro This method can be overridden in ``Cerebro`` subclasses The actual ``*args`` and ``**kwargs`` received are implementation defined (depend entirely on the *data/broker/store*), but in general one should expect them to be *printable* to allow for reception and experimentation. """
[docs] def dispatch_channel_event(self, event): """Dispatch a channel event to all running strategies. Routes tick, orderbook, funding, and bar events from the channel system (StreamingEventQueue / LiveEventQueue) to the appropriate ``notify_*`` callbacks on each strategy. Args: event: Event wrapper with ``.data`` and ``.channel_type`` attrs. """ data = event.data channel_type = event.channel_type for strat in self.runningstrats: strat._event_count += 1 if channel_type == "tick": strat._tick_count += 1 strat._last_tick[getattr(data, "symbol", "")] = data strat.notify_tick(data) strat._notify_tick_to_observers(data) elif channel_type == "orderbook": strat._last_ob[getattr(data, "symbol", "")] = data strat.notify_orderbook(data) elif channel_type == "funding": strat._last_funding[getattr(data, "symbol", "")] = data strat.notify_funding(data) elif channel_type == "bar": strat.notify_bar(data) strat._notify_bar_to_observers(data)
def _start_channel_strategy(self, strat): """Start a channel-mode strategy without assuming bar datas exist.""" if getattr(strat, "datas", None): strat._start() return for analyzer in itertools.chain(strat.analyzers, strat._slave_analyzers): analyzer._start() for observer in strat._get_all_observers(): observer._start() strat.start() def _advance_channel_strategy_clock(self, strat, event): """Advance no-data channel strategies so observers can run per event.""" if getattr(strat, "datas", None): return try: strat.forward() except Exception: logger.debug("Channel strategy forward() failed", exc_info=True) timestamp = getattr(event, "timestamp", None) if timestamp is None: return try: event_dt = datetime.datetime.fromtimestamp(float(timestamp), UTC) event_num = date2num(event_dt) strat.lines.datetime[0] = event_num strat._last_valid_datetime = event_num placeholder_map = getattr(strat, "placeholder_data", None) if isinstance(placeholder_map, dict): symbol = getattr(getattr(event, "data", None), "symbol", None) placeholder = placeholder_map.get(str(symbol)) if symbol is not None else None if placeholder is not None: try: placeholder._len = max(int(getattr(placeholder, "_len", 0)), len(strat)) except Exception: logger.debug("Channel placeholder length update failed", exc_info=True) try: placeholder.datetime[0] = event_num except Exception: logger.debug("Channel placeholder datetime update failed", exc_info=True) try: last_price = getattr(event.data, "price", None) if last_price is None: last_price = getattr(event.data, "close", None) if last_price is not None: placeholder.close[0] = float(last_price) except Exception: logger.debug("Channel placeholder price update failed", exc_info=True) except Exception: logger.debug("Channel strategy datetime update failed", exc_info=True) def _step_channel_strategy(self, strat): """Run channel-mode analyzers and observers once per event.""" if getattr(strat, "datas", None): return for analyzer in itertools.chain(strat.analyzers, strat._slave_analyzers): analyzer._next() for observer in strat._get_all_observers(): observer._next() def _stop_channel_strategy(self, strat): """Stop a channel-mode strategy without requiring bar datas.""" if getattr(strat, "datas", None): strat._stop() return strat.stop() for analyzer in itertools.chain(strat.analyzers, strat._slave_analyzers): analyzer._stop() for observer in strat._get_all_observers(): try: if hasattr(observer, "stop"): observer.stop() except Exception: logger.warning( "Observer %s.stop() raised an exception", type(observer).__name__, exc_info=True, ) # ------------------------------------------------------------------ # Channel mode implementation (called from run(channel=...)) # ------------------------------------------------------------------ def _run_channel(self, channel, **kwargs): """Internal: run strategies in channel event mode. ``channel`` may be: * An iterable of ``Event`` objects – events are processed in a loop, dispatched to broker and strategies. * ``True`` – strategies are instantiated and returned immediately without entering an event loop (for external async drivers). """ # Override params pkeys = self.params._getkeys() for key, val in kwargs.items(): if key in pkeys: setattr(self.params, key, val) # Channel-mode brokers emit simulated order notifications; force the # quick-notify path so strategy/observer callbacks receive them. self.p.quicknotify = True # --- strategy instantiation (simplified, no bar-data required) --- self._init_stcount() runstrats: list = [] self.runningstrats = runstrats # Start broker self._broker.start() self._instantiate_channel_strategies(runstrats) self._wire_channel_strategies(runstrats) # If channel is just True, return strategies for external event loops if channel is True: self.runstrats = [runstrats] return runstrats # --- channel event loop --- for event in channel: if self._event_stop: break for strat in runstrats: self._advance_channel_strategy_clock(strat, event) # 1. Let the broker process the raw event data ch = event.channel_type evdata = event.data if ch == "tick" and hasattr(self._broker, "process_tick"): self._broker.process_tick(evdata) elif ch == "orderbook" and hasattr(self._broker, "process_orderbook"): self._broker.process_orderbook(evdata) elif ch == "bar" and hasattr(self._broker, "process_bar"): self._broker.process_bar(evdata) # 2. Deliver broker order-fill notifications to strategies while True: order = self._broker.get_notification() if order is None: break owner = getattr(order, "owner", None) if owner is None: owner = getattr(getattr(order, "p", None), "owner", None) if owner is None and runstrats: owner = runstrats[0] if owner is not None: owner._addnotification(order, quicknotify=True) # 3. Dispatch channel event to strategies self.dispatch_channel_event(event) # 4. Advance analyzers/observers that rely on next()-style hooks for strat in runstrats: self._step_channel_strategy(strat) # --- teardown --- for strat in runstrats: self._stop_channel_strategy(strat) self._broker.stop() self.runstrats = [runstrats] return runstrats def _instantiate_channel_strategies(self, runstrats): """Instantiate strategy classes for channel mode and append to ``runstrats``. Extracted from ``_run_channel`` (instantiation phase); behavior unchanged. Honors ``StrategySkipError``, ``oldsync``, ``tradehistory`` and broker-provided context exactly as before. """ # Instantiate each strategy class added via addstrategy() iterstrats = itertools.product(*self.strats) for iterstrat in iterstrats: for stratcls, sargs, skwargs in iterstrat: try: with OwnerContext.set_owner(self): if hasattr(stratcls, "_create_strategy_safely"): strat = stratcls._create_strategy_safely(*sargs, **skwargs) else: strat = stratcls(*sargs, **skwargs) except errors.StrategySkipError: continue # user requested skip, same as standard run() path if self.p.oldsync: strat._oldsync = True if self.p.tradehistory: strat.set_tradehistory() runstrats.append(strat) context_getter = getattr(self._broker, "get_context", None) if callable(context_getter): context = context_getter() for strat in runstrats: strat.context = context def _wire_channel_strategies(self, runstrats): """Attach observers, analyzers and sizers to channel strategies and start them. Extracted from ``_run_channel`` (setup phase); behavior unchanged. """ # Channel mode still needs explicit observers/analyzers initialization. defaultsizer = self.sizers.get(None, (None, None, None)) for idx, strat in enumerate(runstrats): for multi, obscls, obsargs, obskwargs in self.observers: strat._addobserver(multi, obscls, *obsargs, **obskwargs) for ancls, anargs, ankwargs in self.analyzers: strat._addanalyzer(ancls, *anargs, **ankwargs) sizer, sargs, skwargs = self.sizers.get(idx, defaultsizer) if sizer is not None: strat._addsizer(sizer, *sargs, **skwargs) self._start_channel_strategy(strat)
[docs] def adddata(self, data, name: str = None): """ Adds a ``Data Feed`` instance to the mix. If ``name`` is not None, it will be put into ``data._name`` which is meant for decoration/plotting purposes. """ # Set data name if provided if name is not None: data._name = name data.name = name # Assign unique ID to each data feed data._id = next(self._dataid) # Set data's environment to this cerebro data.setenvironment(self) # Add to data list self.datas.append(data) # Store in name lookup dictionary self.datasbyname[data._name] = data # Get feed from data feed = data.getfeed() # Add feed if not already present if feed and feed not in self.feeds: self.feeds.append(feed) self._maybe_add_store(data) # Set live mode if data is live if data.islive(): self._dolive = True return data
[docs] def chaindata(self, *args, **kwargs): """ Chains several data feeds into one If ``name`` is passed as named argument and not `None`, it will be put into ``data._name`` which is meant for decoration/plotting purposes. If `None`, then the name of the first data will be used """ dname = kwargs.pop("name", None) if dname is None: dname = args[0]._dataname d = feeds.Chainer(dataname=dname, *args) self.adddata(d, name=dname) return d
[docs] def rolloverdata(self, *args, **kwargs): """Chains several data feeds into one If ``name`` is passed as named argument and is not None, it will be put into ``data._name`` which is meant for decoration/plotting purposes. If `None`, then the name of the first data will be used Any other kwargs will be passed to the RollOver class """ dname = kwargs.pop("name", None) if dname is None: dname = args[0]._dataname d = feeds.RollOver(dataname=dname, *args, **kwargs) self.adddata(d, name=dname) return d
[docs] def replaydata(self, dataname, name=None, **kwargs): """ Adds a ``Data Feed`` to be replayed by the system If ``name`` is not None, it will be put into ``data._name`` which is meant for decoration/plotting purposes. Any other kwargs like ``timeframe``, ``compression``, ``todate`` which are supported by the replay filter will be passed transparently """ if any(dataname is x for x in self.datas): dataname = dataname.clone() dataname.replay(**kwargs) self.adddata(dataname, name=name) self._doreplay = True return dataname
[docs] def resampledata(self, dataname, name=None, **kwargs): """ Adds a ``Data Feed`` to be resample by the system If ``name`` is not None, it will be put into ``data._name`` which is meant for decoration/plotting purposes. Any other kwargs like ``timeframe``, ``compression``, ``todate`` which are supported by the resample filter will be passed transparently """ if any(dataname is x for x in self.datas): dataname = dataname.clone() dataname.resample(**kwargs) self.adddata(dataname, name=name) self._doreplay = True return dataname
[docs] def optcallback(self, cb): """ Adds a *callback* to the list of callbacks that will be called with the optimizations when each of the strategies has been run The signature: cb(strategy) """ self.optcbs.append(cb)
[docs] def optstrategy(self, strategy, *args, **kwargs): """ Adds a ``Strategy`` class to the mix for optimization. Instantiation will happen during ``run`` time. args and kwargs MUST BE iterables that hold the values to check. Example: if a Strategy accepts a parameter `period`, for optimization purposes, the call to ``optstrategy`` looks like: - cerebro.optstrategy(MyStrategy, period=(15, 25)) This will execute an optimization for values 15 and 25. Whereas - cerebro.optstrategy(MyStrategy, period=range(15, 25)) will execute MyStrategy with ``period`` values 15 -> 25 (25 not included, because ranges are semi-open in Python) If a parameter is passed but shall not be optimized, the call looks like: - cerebro.optstrategy(MyStrategy, period=(15,)) Notice that `period` is still passed as an iterable ... of just one element ``backtrader`` will anyhow try to identify situations like: - cerebro.optstrategy(MyStrategy, period=15) and will create an internal pseudo-iterable if possible """ self._dooptimize = True args = self.iterize(args) optargs = itertools.product(*args) optkeys = list(kwargs) vals = self.iterize(kwargs.values()) optvals = itertools.product(*vals) okwargs1 = map(zip, itertools.repeat(optkeys), optvals) optkwargs = map(dict, okwargs1) it = itertools.product([strategy], optargs, optkwargs) self.strats.append(it)
[docs] def addstrategy(self, strategy: type, *args, **kwargs) -> int: """ Adds a ``Strategy`` class to the mix for a single pass run. Instantiation will happen during ``run`` time. Args and kwargs will be passed to the strategy as they are during instantiation. Returns the index with which addition of other objects (like sizers) can be referenced """ self.strats.append([(strategy, args, kwargs)]) return len(self.strats) - 1
[docs] def setbroker(self, broker): """ Sets a specific ``broker`` instance for this strategy, replacing the one inherited from cerebro. """ self._broker = broker broker.cerebro = self self._maybe_add_store(broker) return broker
[docs] def getbroker(self): """ Returns the broker instance. This is also available as a ``property`` by the name ``broker`` """ return self._broker
broker = property(getbroker, setbroker)
[docs] def plot( self, plotter=None, numfigs=1, iplot=True, start=None, end=None, width=16, height=9, dpi=300, tight=True, use=None, backend="matplotlib", **kwargs, ): """ Plots the strategies inside cerebro If ``plotter`` is None, a default ``Plot`` instance is created and ``kwargs`` are passed to it during instantiation. ``numfigs`` split the plot in the indicated number of charts reducing chart density if wished ``iplot``: if ``True`` and running in a ``notebook`` the charts will be displayed inline ``use``: set it to the name of the desired matplotlib backend. It will take precedence over ``iplot`` ``backend``: plotting backend to use. Options: - 'matplotlib': traditional matplotlib plotting (default) - 'plotly': interactive Plotly charts (better for large data) ``start``: An index to the datetime line array of the strategy or a ``datetime.date``, ``datetime.datetime`` instance indicating the start of the plot ``end``: An index to the datetime line array of the strategy or a ``datetime.date``, ``datetime.datetime`` instance indicating the end of the plot ``width``: in inches of the saved figure ``height``: in inches of the saved figure ``dpi``: quality in dots per inches of the saved figure ``tight``: only save actual content and not the frame of the figure """ if self._exactbars > 0: return None # For plotly backend, ensure Transactions analyzer exists for buy/sell signals if backend == "plotly": for stratlist in self.runstrats: for strat in stratlist: # Check if Transactions analyzer already exists has_txn = any(a.__class__.__name__ == "Transactions" for a in strat.analyzers) if not has_txn: # Add Transactions analyzer retroactively is not possible # So we'll rely on broker.orders instead pass if not plotter: from . import plot if backend == "plotly": plotter = plot.PlotlyPlot(**kwargs) elif self.p.oldsync: plotter = plot.Plot_OldSync(**kwargs) else: plotter = plot.Plot(**kwargs) # pfillers = {self.datas[i]: self._plotfillers[i] # for i, x in enumerate(self._plotfillers)} # pfillers2 = {self.datas[i]: self._plotfillers2[i] # for i, x in enumerate(self._plotfillers2)} figs = [] for stratlist in self.runstrats: for si, strat in enumerate(stratlist): rfig = plotter.plot( strat, figid=si * 100, numfigs=numfigs, iplot=iplot, start=start, end=end, use=use, ) # pfillers=pfillers2) figs.append(rfig) plotter.show() return figs
# Module passed to cerebro for multiprocessing during optimization
[docs] def __call__(self, iterstrat): """ Used during optimization to pass the cerebro over the multiprocessing module without complaints """ predata = self.p.optdatas and self._dopreload and self._dorunonce return self.runstrategies(iterstrat, predata=predata)
# Delete runstrats when pickling
[docs] def __getstate__(self): """ Used during optimization to prevent optimization result `runstrats` from being pickled to subprocesses """ rv = vars(self).copy() if "runstrats" in rv: del rv["runstrats"] return rv
# When called from within a strategy or elsewhere, stops execution quickly
[docs] def runstop(self): """If invoked from inside a strategy or anywhere else, including other threads, the execution will stop as soon as possible.""" self._event_stop = True # signal a stop has been requested
# Core method for backtesting. Any passed kwargs affect cerebro standard parameters. # If no data added, will stop immediately. Return value differs based on optimization. def _resolve_run_flags(self): """Resolve runonce/preload/exactbars/replay/live flags and build writers. Extracted from run() to keep that method readable. Sets the private execution-mode flags on self and populates self.runwriters / self.writers_csv. No behavior change. """ # Check if _dorunonce, _dopreload, _exactbars self._dorunonce = self.p.runonce self._dopreload = self.p.preload self._exactbars = int(self.p.exactbars) # If _exactbars is not 0, _dorunonce must be False; if _dopreload is True and _exactbars < 1, set _dopreload to True if self._exactbars: self._dorunonce = False # something is saving memory, no runonce self._dopreload = self._dopreload and self._exactbars < 1 # If _doreplay is True or any data has replaying attribute True, set _doreplay to True self._doreplay = self._doreplay or any(x.replaying for x in self.datas) # If _doreplay, need to set _dopreload to False if self._doreplay: # preloading is not supported with replay. full timeframe bars # are constructed in realtime self._dopreload = False # If _dolive or live, need to set _dorunonce and _dopreload to False if self._dolive or self.p.live: # in this case, both preload and runonce must be off self._dorunonce = False self._dopreload = False # Writer list self.runwriters = [] # Add the system default writer if requested if self.p.writer is True: wr = WriterFile() self.runwriters.append(wr) # Instantiate any other writers for wrcls, wrargs, wrkwargs in self.writers: wr = wrcls(*wrargs, **wrkwargs) self.runwriters.append(wr) # Write down if any writer wants the full csv output self.writers_csv = any(map(lambda x: x.p.csv, self.runwriters))
[docs] def run(self, **kwargs) -> list: """The core method to perform backtesting. Any ``kwargs`` passed to it will affect the value of the standard parameters ``Cerebro`` was instantiated with. If `cerebro` has no data **and** no ``channel`` is given, the method will immediately bail out. Extra keyword arguments ----------------------- channel : iterable or True, optional When provided the engine runs in **channel mode** instead of the traditional bar-based mode. * *iterable* – an ``Event`` stream (``StreamingEventQueue``, ``LiveEventQueue``, or any iterable yielding ``Event`` objects). Events are dispatched to the broker and then to every strategy via their ``notify_*`` callbacks. * ``True`` – strategies are instantiated and returned immediately **without** entering an event loop. This is useful when an external async loop drives the data (e.g. external market-data watchers calling ``strategy.notify_tick()`` directly). Call ``cerebro.runstop()`` when done to tear down brokers and strategies. It has different return values: - For No Optimization: a list contanining instances of the Strategy classes added with ``addstrategy`` - For Optimization: a list of lists which contain instances of the Strategy classes added with ``addstrategy`` """ self._event_stop = False # Stop is requested # --- channel mode --------------------------------------------------- channel = kwargs.pop("channel", None) if channel is not None: # _run_channel is dynamically typed; run() advertises -> list. return self._run_channel(channel, **kwargs) # type: ignore[no-any-return] # If no data, return empty list immediately if not self.datas: return [] # nothing can be run # Override standard parameters with passed kwargs pkeys = self.params._getkeys() for key, val in kwargs.items(): if key in pkeys: setattr(self.params, key, val) # Manage activate/deactivate object cache # Manage object cache linebuffer.LineActions.cleancache() # clean cache indicator.Indicator.cleancache() # clean cache linebuffer.LineActions.usecache(self.p.objcache) indicator.Indicator.usecache(self.p.objcache) # Resolve runonce/preload/exactbars/replay/live execution flags + writers self._resolve_run_flags() # Running strategy list self.runstrats = [] # If signals is not None, handle signalstrategy related issues if self.signals: # allow processing of signals signalst, sargs, skwargs = self._signal_strat if signalst is None: # Try to see if the 1st regular strategy is a signal strategy try: signalst, sargs, skwargs = self.strats.pop(0) except IndexError: pass # Nothing there else: if not isinstance(signalst, SignalStrategy): # no signal ... reinsert at the beginning self.strats.insert(0, (signalst, sargs, skwargs)) signalst = None # flag as not present if signalst is None: # recheck # Still None, create a default one signalst, sargs, skwargs = SignalStrategy, (), {} # sargs/skwargs always come from a (args, kwargs) pair or the # tuple()/dict() defaults above; normalize for safe unpacking. sargs = sargs or () skwargs = skwargs or {} # Add the signal strategy self.addstrategy( signalst, *sargs, _accumulate=self._signal_accumulate, _concurrent=self._signal_concurrent, signals=self.signals, **skwargs, ) # If strategy list is empty, add strategy if not self.strats: # Datas are present, add a strategy self.addstrategy(Strategy) # Iterate strategies iterstrats = itertools.product(*self.strats) # If not optimization parameters, or using 1 cpu core if not self._dooptimize or self.p.maxcpus == 1: # If no optimmization is wished ... or 1 core is to be used # let's skip process "spawning" # Iterate through strategies for iterstrat in iterstrats: # Run strategy runstrat = self.runstrategies(iterstrat) # Add running strategy to running strategy list self.runstrats.append(runstrat) # If optimization parameters if self._dooptimize: # Iterate all optcbs to return stopped strategy results for cb in self.optcbs: cb(runstrat) # callback receives finished strategy # If optimization parameters else: # If optdatas is True, and _dopreload, and _dorunonce if self.p.optdatas and self._dopreload and self._dorunonce: # Iterate each data, reset, if _exactbars < 1, extend data # Start data # If data _dopreload, call preload on data for data in self.datas: data.reset() if self._exactbars < 1: # datas can be a full length data.extend(size=self.params.lookahead) data._start() data.preload() # Start process pool pool = multiprocessing.Pool(self.p.maxcpus or None) for r in pool.imap(self, iterstrats): self.runstrats.append(r) for cb in self.optcbs: cb(r) # callback receives finished strategy # Close process pool pool.close() # If optdatas is True, and _dopreload, and _dorunonce, iterate data and stop data if self.p.optdatas and self._dopreload and self._dorunonce: for data in self.datas: data.stop() # If not optimization parameters if not self._dooptimize: # avoid a list of list for regular cases return self.runstrats[0] # type: ignore[no-any-return] return self.runstrats
# Initialize count def _init_stcount(self): self.stcount = itertools.count(0) # Call next count def _next_stid(self): return next(self.stcount) def _prepare_run(self, predata=False): """Start components and (optionally) preload data before strategies run. Extracted from runstrategies() to keep that method readable. Starts stores, applies cheat-on-open/fund/order-history settings, starts the broker and feeds, writes CSV writer headers, and resets/preloads each data feed unless ``predata`` is True. """ # Iterate stores and start for store in self.stores: store.start() # If cheat_on_open and broker_coo, set broker accordingly if self.p.cheat_on_open and self.p.broker_coo: # try to activate in broker if hasattr(self._broker, "set_coo"): self._broker.set_coo(True) # If fund history is not None, need to set fund history if self._fhistory is not None: self._broker.set_fund_history(self._fhistory) # Iterate order history for orders, onotify in self._ohistory: self._broker.add_order_history(orders, onotify) # Broker start self._broker.start() # Feed start for feed in self.feeds: feed.start() # If need to save writer data if self.writers_csv: # headers wheaders = [] # Iterate data, if data csv attribute is True, get headers that need saving for data in self.datas: if data.csv: wheaders.extend(data.getwriterheaders()) # Save writer headers for writer in self.runwriters: if writer.p.csv: writer.addheaders(wheaders) # If no predata, need to pre-process data, similar to run method preprocessing if not predata: for data in self.datas: data.reset() if self._exactbars < 1: # datas can be a full length data.extend(size=self.params.lookahead) data._start() if self._dopreload: data.preload() # Run strategy
[docs] def runstrategies(self, iterstrat, predata=False): """ Internal method invoked by ``run``` to run a set of strategies """ self._init_stcount() # Initialize running strategy as empty list self.runningstrats = runstrats = [] # Start stores/broker/feeds, apply fund + order history, write headers # and (optionally) preload data. Extracted for readability. self._prepare_run(predata) # Loop through strategies for stratcls, sargs, skwargs in iterstrat: # Add data to strategy parameters sargs = self.datas + list(sargs) # Instantiate strategy with OwnerContext so findowner() can find Cerebro try: # Use OwnerContext so Strategy.__new__ can find Cerebro via findowner() with OwnerContext.set_owner(self): # Use safe strategy creation to handle parameter filtering if hasattr(stratcls, "_create_strategy_safely"): strat = stratcls._create_strategy_safely(*sargs, **skwargs) else: # Fallback to direct instantiation strat = stratcls(*sargs, **skwargs) except errors.StrategySkipError: continue # do not add strategy to the mix # Old data synchronization method if self.p.oldsync: strat._oldsync = True # tell strategy to use old clock update # Whether to save trade history data if self.p.tradehistory: strat.set_tradehistory() # Add strategy runstrats.append(strat) # Get timezone info, if tz is integer, get tz at that index; otherwise use tzparse tz = self.p.tz if isinstance(tz, integer_types): tz = self.datas[tz]._tz else: tz = tzparse(tz) # If runstrats is not empty list if runstrats: # loop separated for clarity # Get default sizer defaultsizer = self.sizers.get(None, (None, None, None)) # For each strategy for idx, strat in enumerate(runstrats): # If stdstats is True, add several observers if self.p.stdstats: # Add observer broker strat._addobserver(False, observers.Broker) # Add observers.BuySell if self.p.oldbuysell: strat._addobserver(True, observers.BuySell) else: strat._addobserver(True, observers.BuySell, barplot=True) # Add observer trade if self.p.oldtrades or len(self.datas) == 1: strat._addobserver(False, observers.Trades) else: strat._addobserver(False, observers.DataTrades) # Add observers and their parameters to strategy for multi, obscls, obsargs, obskwargs in self.observers: strat._addobserver(multi, obscls, *obsargs, **obskwargs) # Add indicators to strategy for indcls, indargs, indkwargs in self.indicators: strat._addindicator(indcls, *indargs, **indkwargs) # Add analyzers to strategy for ancls, anargs, ankwargs in self.analyzers: strat._addanalyzer(ancls, *anargs, **ankwargs) # Get specific sizer, if sizer is not None, add to strategy sizer, sargs, skwargs = self.sizers.get(idx, defaultsizer) if sizer is not None: strat._addsizer(sizer, *sargs, **skwargs) # Set timezone strat._settz(tz) # Strategy start strat._start() # For running writers, if csv parameter is True, save strategy data to writer for writer in self.runwriters: if writer.p.csv: writer.addheaders(strat.getwriterheaders()) # If predata is False, data not preloaded if not predata: # Loop each strategy, call qbuffer to cache data for strat in runstrats: strat.qbuffer(self._exactbars, replaying=self._doreplay) # Loop each writer, start writer for writer in self.runwriters: writer.start() # Prepare timers self._timers = [] self._timerscheat = [] # Loop timers for timer in self._pretimers: # preprocess tzdata if needed # Start timer timer.start(self.datas[0]) # If timer parameter cheat is True, add timer to self._timerscheat, otherwise add to self._timers if timer.params.cheat: self._timerscheat.append(timer) else: self._timers.append(timer) # Run the main loop; keep cleanup deterministic, but never turn a # strategy/runtime exception into a successful empty backtest. run_exception = None try: # If _dopreload and _dorunonce are True if self._dopreload and self._dorunonce: # If old data alignment and sync method, use _runonce_old, otherwise use _runonce if self.p.oldsync: self._runonce_old(runstrats) else: self._runonce(runstrats) # If _dopreload and _dorunonce are not both True else: # If old data alignment and sync method, use _runnext_old, otherwise use _runnext if self.p.oldsync: self._runnext_old(runstrats) else: self._runnext(runstrats) except Exception as exc: run_exception = exc logger.exception("Unhandled exception in run loop, cleaning up before re-raising") finally: # Iterate strategies and stop running (always runs) for strat in runstrats: strat._stop() # Stop broker self._broker.stop() # If predata is False, iterate data and stop each data if not predata: for data in self.datas: data.stop() # Iterate each feed and stop feed for feed in self.feeds: feed.stop() # Iterate each store and stop store for store in self.stores: store.stop() # Stop writer self.stop_writers(runstrats) if run_exception is not None: raise run_exception # If doing parameter optimization and optreturn is True, build lightweight # OptReturn results (detached from data) instead of full strategy objects. if self._dooptimize and self.p.optreturn: return self._build_optreturn_results(runstrats) return runstrats
def _build_optreturn_results(self, runstrats): """Build OptReturn results for an optimization run. Detaches analyzers from their strategy/data references (so the result is lightweight and picklable across process boundaries) and wraps each strategy's params + analyzers in an OptReturn. """ results = [] for strat in runstrats: for a in strat.analyzers: a.strategy = None a._parent = None # OPTIMIZED: Use __dict__ instead of dir() for better performance for attrname in list(a.__dict__.keys()): if attrname.startswith("data"): setattr(a, attrname, None) oreturn = OptReturn(strat.params, analyzers=strat.analyzers, strategycls=type(strat)) results.append(oreturn) return results # Stop writer
[docs] def stop_writers(self, runstrats): """Stop all writers and write final information. Args: runstrats: List of strategy instances that were run. Collects information from data feeds and strategies, writes the information to all registered writers, and stops them. """ # Cerebro info cerebroinfo = OrderedDict() # Data info datainfos = OrderedDict() # Get info for each data, save to datainfos, then save to cerebroinfo for i, data in enumerate(self.datas): datainfos["Data%d" % i] = data.getwriterinfo() cerebroinfo["Datas"] = datainfos # Get strategy info and save to stratinfos and cerebroinfo stratinfos = {} for strat in runstrats: stname = strat.__class__.__name__ stratinfos[stname] = strat.getwriterinfo() cerebroinfo["Strategies"] = stratinfos # Write cerebroinfo to file for writer in self.runwriters: writer.writedict({"Cerebro": cerebroinfo}) writer.stop()
# Notify broker info def _brokernotify(self): """ Internal method which kicks the broker and delivers any broker notification to the strategy """ # Call broker's next self._broker.next() while True: # Get order info to notify, if order is None break loop, otherwise get order's owner. # If owner is None, default to first strategy order = self._broker.get_notification() if order is None: break owner = order.owner if owner is None: owner = self.runningstrats[0] # default # Notify order info through first strategy owner._addnotification(order, quicknotify=self.p.quicknotify) # Old runnext method, similar to runnext def _runnext_old(self, runstrats): """ Actual implementation of run in full next mode. All objects have its `next` method invoked on each data arrival """ data0 = self.datas[0] d0ret = True while d0ret or d0ret is None: lastret = False # Notify anything from the store even before moving datas # because datas may not move due to an error reported by the store self._storenotify() if self._event_stop: # stop if requested return self._datanotify() if self._event_stop: # stop if requested return d0ret = data0.next() if d0ret: for data in self.datas[1:]: if not data.next(datamaster=data0): # no delivery data._check(forcedata=data0) # check forcing output data.next(datamaster=data0) # retry elif d0ret is None: # meant for things like live feeds which may not produce a bar # at the moment but need the loop to run for notifications and # getting resample and others to produce timely bars data0._check() for data in self.datas[1:]: data._check() else: lastret = data0._last() for data in self.datas[1:]: lastret += data._last(datamaster=data0) if not lastret: # Only go extra round if something was changed by "lasts" break # Datas may have generated a new notification after next self._datanotify() if self._event_stop: # stop if requested return self._brokernotify() if self._event_stop: # stop if requested return if d0ret or lastret: # bars produced by data or filters for strat in runstrats: strat._next() if self._event_stop: # stop if requested return self._next_writers(runstrats) # Last notification chance before stopping self._datanotify() if self._event_stop: # stop if requested return self._storenotify() if self._event_stop: # stop if requested return # Old runonce method, similar to runonce def _runonce_old(self, runstrats): """ Actual implementation of run in vector mode. Strategies are still invoked on a pseudo-event mode in which `next` is called for each data arrival """ for strat in runstrats: strat._once() # The default once for strategies does nothing and therefore # has not moved forward all datas/indicators/observers that # were homed before calling once, Hence no "need" to do it # here again, because pointers are at 0 data0 = self.datas[0] datas = self.datas[1:] for i in range(data0.buflen()): self._storenotify() if self._event_stop: # stop if requested return self._datanotify() if self._event_stop: # stop if requested return data0.advance() for data in datas: data.advance(datamaster=data0) self._brokernotify() if self._event_stop: # stop if requested return for strat in runstrats: # data0.datetime[0] for compat. w/ new strategy's oncepost strat._oncepost(data0.datetime[0]) if self._event_stop: # stop if requested return self._next_writers(runstrats) self._datanotify() if self._event_stop: # stop if requested return self._storenotify() if self._event_stop: # stop if requested return # Run writer's next def _next_writers(self, runstrats): if not self.runwriters: return if self.writers_csv: wvalues = [] for data in self.datas: if data.csv: wvalues.extend(data.getwritervalues()) for strat in runstrats: wvalues.extend(strat.getwritervalues()) for writer in self.runwriters: if writer.p.csv: writer.addvalues(wvalues) writer.next() # Disable runonce def _disable_runonce(self): """API for lineiterators to disable runonce (see HeikinAshi)""" self._dorunonce = False # runnext method, core of the framework, event-driven core for data execution def _runnext(self, runstrats): """Actual implementation of run in full next mode. All objects have their ``next`` method invoked on each data arrival. The loop has four phases per iteration: 1. **Notification**: store and data notifications dispatched. 2. **Feed advance**: each data feed is advanced; ``d0ret`` computed. 3. **Time alignment**: feeds aligned to master datetime ``dt0``; slower feeds rewound, faster feeds tick-filled. 4. **Strategy dispatch**: timers fired, broker notified, strategies receive ``_next()`` / ``_next_open()``. """ try: # Sort data by time period datas = sorted(self.datas, key=lambda x: (x._timeframe, x._compression)) # Other data datas1 = datas[1:] # Main data data0 = datas[0] d0ret = True # index for resample only, not replay rsonly = [i for i, x in enumerate(datas) if x.resampling and not x.replaying] # Check if only doing resample onlyresample = len(datas) == len(rsonly) # Check if no data needs resample noresample = not rsonly # Number of cloned data clonecount = sum(d._clone for d in datas) # Number of data ldatas = len(datas) # Number of non-cloned data ldatas_noclones = ldatas - clonecount # Default dt0 at max time dt0 = date2num(datetime.datetime.max) - 2 # default at max # Note: 'while True' (not 'while d0ret or d0ret is None') is intentional: # when d0ret becomes False, the else branch still runs _last() on feeds # and only breaks if no feed produces additional data. while True: # if any has live data in the buffer, no data will wait anything # If any live data exists, newqcheck is False newqcheck = not any(d.haslivedata() for d in datas) # If live data exists if not newqcheck: # If no data has reached the live status or all, wait for # the next incoming data # livecount is the number of live data livecount = sum(d._laststatus == d.LIVE for d in datas) # Override qcheck for mixed live/historical: wait only when # no feeds are LIVE or ALL non-clone feeds are LIVE. # When only some feeds are LIVE, skip wait for faster iteration. newqcheck = not livecount or livecount == ldatas_noclones lastret = False # Notify anything from the store even before moving datas # because datas may not move due to an error reported by the store # Notify store related info self._storenotify() if self._event_stop: # stop if requested return # Notify data related info self._datanotify() if self._event_stop: # stop if requested return # record starting time and tell feeds to discount the elapsed time # from the qcheck value # Record start time and notify feed to subtract elapsed time from qcheck drets = [] if newqcheck and any(d.p.qcheck for d in datas): qstart = datetime.datetime.now(UTC) for d in datas: qlapse = datetime.datetime.now(UTC) - qstart d.do_qcheck(newqcheck, qlapse.total_seconds()) d_next = d.next(ticks=False) drets.append(d_next) else: for d in datas: d.do_qcheck(False, 0.0) d_next = d.next(ticks=False) drets.append(d_next) # Iterate drets, if d0ret is False and any dret is None, d0ret is None d0ret = any(dret for dret in drets) if not d0ret and any(dret is None for dret in drets): d0ret = None # If d0ret is not None if d0ret: # Get time dts = [] for i, ret in enumerate(drets): dts.append(datas[i].datetime[0] if ret else None) # Get index to minimum datetime # Get minimum time if onlyresample or noresample: dt0 = min(d for d in dts if d is not None) else: dt0 = min( (d for i, d in enumerate(dts) if d is not None and i not in rsonly) ) # Guard: dt0 < 1 means ordinal date before 0001-01-01 # (invalid/sentinel value from uninitialized data) if dt0 < 1: logger.warning( "Invalid datetime value dt0=%s detected in _runnext, aborting run loop", dt0, ) return # Get master data and time dmaster = datas[dts.index(dt0)] # and timemaster self._dtmaster = dmaster.num2date(dt0) self._udtmaster = num2date(dt0) # Try to get something for those that didn't return # Loop through drets for i, ret in enumerate(drets): # If ret is not None, continue to next ret if ret: # dts already contains a valid datetime for this i continue # try to get data by checking with a master # Get data and try to set time for dts d = datas[i] d._check(forcedata=dmaster) # check to force output if d.next(datamaster=dmaster, ticks=False): # retry dts[i] = d.datetime[0] # good -> store # make sure only those at dmaster level end up delivering # Iterate dts for i, dti in enumerate(dts): # If dti is not None if dti is not None: # Get data di = datas[i] if dti > dt0: di.rewind() # cannot deliver yet # If not replay elif not di.replaying: # Replay forces tick fill, else force here di._tick_fill(force=True) # If d0ret is None, iterate each data and call _check() elif d0ret is None: # meant for things like live feeds which may not produce a bar # at the moment but need the loop to run for notifications and # getting resample and others to produce timely bars for data in datas: data._check() # If other case else: lastret = data0._last() for data in datas1: lastret += data._last(datamaster=data0) if not lastret: # Only go extra round if something was changed by "lasts" break # Datas may have generated a new notification after next # Notify data info self._datanotify() if self._event_stop: # stop if requested return # Check timer and iterate strategies, call _next_open() to run if d0ret or lastret: # if any bar, check timers before broker self._check_timers(runstrats, dt0, cheat=True) if self.p.cheat_on_open: for strat in runstrats: strat._next_open() if self._event_stop: # stop if requested return # Notify broker (only when data is available to avoid IndexError) if d0ret or lastret: self._brokernotify() if self._event_stop: # stop if requested return # Notify timer and iterate strategies to run if d0ret or lastret: # bars produced by data or filters self._check_timers(runstrats, dt0, cheat=False) for strat in runstrats: strat._next() if self._event_stop: # stop if requested return self._next_writers(runstrats) # Last notification chance before stopping # Notify data info self._datanotify() if self._event_stop: # stop if requested return # Notify store info self._storenotify() if self._event_stop: # stop if requested return except Exception: logger.exception("Unhandled exception in _runnext") raise # runonce def _runonce(self, runstrats): """ Actual implementation of run in vector mode. Strategies are still invoked on a pseudo-event mode in which `next` is called for each data arrival """ # Iterate strategies, call _once and reset for strat in runstrats: strat._once() strat.reset() # strat called next by next - reset lines # The default once for strategies does nothing and therefore # has not moved forward all datas/indicators/observers that # were homed before calling once, Hence no "need" to do it # here again, because pointers are at 0 # Sort data from small period to large period datas = sorted(self.datas, key=lambda x: (x._timeframe, x._compression)) while True: self._storenotify() if self._event_stop: # stop if requested return self._datanotify() if self._event_stop: # stop if requested return # Check the next incoming date in the datas # For each data call advance_peek(), get minimum time as the first one dts = [d.advance_peek() for d in datas] dt0 = min(dts) if dt0 == float("inf"): break # no data delivers anything # Timemaster if needed be # dmaster = datas[dts.index(dt0)] # and timemaster # For each data time, if time <= minimum time, advance data, otherwise ignore for i, dti in enumerate(dts): if dti <= dt0: datas[i].advance() # self._plotfillers2[i].append(slen) # mark as fill else: # self._plotfillers[i].append(slen) pass # Check timer self._check_timers(runstrats, dt0, cheat=True) # If cheat_on_open, call _oncepost_open() for each strategy if self.p.cheat_on_open: for strat in runstrats: strat._oncepost_open() # If stop was called, stop if self._event_stop: # stop if requested return # Call _brokernotify() self._brokernotify() # If stop was called, stop if self._event_stop: # stop if requested return # Check timer self._check_timers(runstrats, dt0, cheat=False) for strat in runstrats: strat._oncepost(dt0) if self._event_stop: # stop if requested return self._next_writers(runstrats) # Check timer def _check_timers(self, runstrats, dt0, cheat=False): # If cheat is False, timers equals self._timers, otherwise equals self._timerscheat timers = self._timers if not cheat else self._timerscheat # For timer in timers for t in timers: # Use timer.check(dt0), if returns True, enter below, otherwise check next timer if not t.check(dt0): continue # CRITICAL FIX: Remove 'when' from kwargs to avoid conflict with position argument # when is already passed as t.lastwhen (2nd argument) timer_kwargs = {k: v for k, v in t.kwargs.items() if k != "when"} # Notify timer t.params.owner.notify_timer(t, t.lastwhen, *t.args, **timer_kwargs) # If strategy needs to use timer (t.params.strats is True), iterate strategies and call notify_timer if t.params.strats: for strat in runstrats: strat.notify_timer(t, t.lastwhen, *t.args, **timer_kwargs)
[docs] def add_report_analyzers(self, riskfree_rate=0.01): """Automatically add analyzers required for reporting. Adds the following analyzers: - SharpeRatio: Sharpe ratio - DrawDown: Drawdown analysis - TradeAnalyzer: Trade analysis - SQN: System Quality Number - AnnualReturn: Annual returns Args: riskfree_rate: Risk-free rate, default 0.01 (1%) """ from . import analyzers self.addanalyzer( analyzers.SharpeRatio, _name="sharperatio", riskfreerate=riskfree_rate, timeframe=TimeFrame.Months, ) self.addanalyzer(analyzers.DrawDown, _name="drawdown") self.addanalyzer(analyzers.TradeAnalyzer, _name="tradeanalyzer") self.addanalyzer(analyzers.SQN, _name="sqn") self.addanalyzer(analyzers.AnnualReturn, _name="annualreturn") self.addanalyzer(analyzers.TimeReturn, _name="timereturn", timeframe=TimeFrame.Days)
[docs] def generate_report( self, output_path, format="html", template="default", user=None, memo=None, **kwargs ): """Generate backtest report. Args: output_path: Output file path format: Report format ('html', 'pdf', 'json') template: Template name or path (only for HTML/PDF) user: Username memo: Remarks/notes **kwargs: Additional parameters Returns: str: Output file path Raises: RuntimeError: If strategy has not been run yet Example: cerebro = bt.Cerebro() cerebro.addstrategy(MyStrategy) cerebro.adddata(data) cerebro.run() cerebro.generate_report('report.html') """ if not self.runstrats: raise RuntimeError("No strategy has been run. Call cerebro.run() first.") # Get the first strategy strategy = self.runstrats[0][0] from .reports import ReportGenerator report = ReportGenerator(strategy, template=template) format_lower = format.lower() if format_lower == "html": return report.generate_html(output_path, user=user, memo=memo, **kwargs) if format_lower == "pdf": return report.generate_pdf(output_path, user=user, memo=memo, **kwargs) if format_lower == "json": return report.generate_json(output_path, **kwargs) raise ValueError(f"Unsupported format: {format}. Use 'html', 'pdf', or 'json'.")