Source code for backtrader.analyzers.logreturnsrolling
#!/usr/bin/env python
"""LogReturnsRolling Analyzer Module - Rolling log returns calculation.
This module provides the LogReturnsRolling analyzer for calculating
rolling log returns over a specified timeframe.
Classes:
LogReturnsRolling: Analyzer that calculates rolling log returns.
Example:
>>> cerebro = bt.Cerebro()
>>> cerebro.addanalyzer(bt.analyzers.LogReturnsRolling)
"""
import collections
import math
from ..analyzer import TimeFrameAnalyzerBase
from ..utils.log_message import get_logger
logger = get_logger(__name__)
__all__ = ["LogReturnsRolling"]
[docs]
class LogReturnsRolling(TimeFrameAnalyzerBase):
"""This analyzer calculates rolling returns for a given timeframe and
compression
Params:
- ``timeframe`` (default: ``None``)
If ``None`` the ``timeframe`` of the first data in the system will be
used
Pass ``TimeFrame.NoTimeFrame`` to consider the entire dataset with no
time constraints
- ``compression`` (default: ``None``)
Only used for sub-day timeframes to, for example, work on an hourly
timeframe by specifying "TimeFrame.Minutes" and 60 as compression
If `None`, then the compression of the first data in the system will be
used
- ``data`` (default: ``None``)
Reference asset to track instead of the portfolio value.
.note: this data must have been added to a ``cerebro`` instance with
``addata``, ``resampledata`` or ``replaydata``
- ``firstopen`` (default: ``True``)
When tracking the returns of `data` the following is done when
crossing a timeframe boundary, for example, ``Years``:
- Last ``close`` the previous year is used as the reference price to
see the return in the current year
The problem is the first calculation, because the data has** no
previous** closing price.As such, and when this parameter is `True`,
the *opening* price will be used for the first calculation.
This requires the data feed to have an ``open`` price (for ``close``
the standard [0] notations will be used without a reference to a field
price)
Else the initial close will be used.
- ``fund`` (default: ``None``)
If `None`, the actual mode of the broker (fundmode - True/False) will
be autodetected to decide if the returns are based on the total net
asset value or on the fund value. See ``set_fundmode`` in the broker
documentation
Set it to ``True`` or ``False`` for a specific behavior
Methods:
- Get_analysis
Returns a dictionary with returns as values and the datetime points for
each return as keys
"""
# Parameters
params = (
("data", None),
("firstopen", True),
("fund", None),
)
# Start
[docs]
def __init__(self, *args, **kwargs):
"""Initialize the LogReturnsRolling analyzer.
Args:
*args: Positional arguments.
**kwargs: Keyword arguments for analyzer parameters.
"""
# Call parent class __init__ method to support timeframe and compression parameters
super().__init__(*args, **kwargs)
self._value = None
self._lastvalue = None
self._values = None
self._fundmode = None
[docs]
def start(self):
"""Initialize the analyzer at the start of the backtest.
Sets the fund mode and initializes the rolling value queue
with size controlled by compression parameter.
"""
super().start()
if self.p.fund is None:
self._fundmode = self.strategy.broker.fundmode
else:
self._fundmode = self.p.fund
# The special part is that self._values is set as a queue, where self.compression parameter controls how many elements the queue saves
# Note: use self.compression (set in _start from data) not self.p.compression (which may be None)
self._values = collections.deque([float("Nan")] * self.compression, maxlen=self.compression)
if self.p.data is None:
# keep the initial portfolio value if not tracing data
if not self._fundmode:
self._lastvalue = self.strategy.broker.getvalue()
else:
self._lastvalue = self.strategy.broker.fundvalue
[docs]
def notify_fund(self, cash, value, fundvalue, shares):
"""Update current value from fund notification.
Args:
cash: Current cash amount.
value: Current portfolio value.
fundvalue: Current fund value.
shares: Number of fund shares.
"""
if not self._fundmode:
self._value = value if self.p.data is None else self.p.data[0]
else:
self._value = fundvalue if self.p.data is None else self.p.data[0]
# Called once in a new timeframe
[docs]
def on_dt_over(self):
"""Handle timeframe boundary crossing.
Updates the rolling value queue when entering a new period.
"""
# next is called in a new timeframe period
if self.p.data is None or len(self.p.data) > 1:
# Not tracking a data feed or data feed has data already
vst = self._lastvalue # update value_start to last
else:
# The 1st tick has no previous reference, use the opening price
vst = self.p.data.open[0] if self.p.firstopen else self.p.data[0]
self._values.append(vst) # push values backwards (and out)
[docs]
def next(self):
"""Calculate and store the rolling log return for the current period.
Calculates log(current_value / oldest_value) from the rolling window.
"""
# Calculate the return
super().next()
# When the strategy is running, if there are too many losses, self._value / self._values[0] might be 0, avoid this situation
try:
start_value = self._values[0]
ratio = self._value / start_value
if isinstance(ratio, complex) or not math.isfinite(ratio) or ratio <= 0:
raise ValueError(f"invalid log return ratio: {ratio}")
log_return = math.log(ratio)
if not math.isfinite(log_return):
raise ValueError(f"invalid log return value: {log_return}")
self.rets[self.dtkey] = log_return
except (TypeError, ValueError, ZeroDivisionError, OverflowError) as e:
logger.debug("Log return calculation failed: %s", e)
self.rets[self.dtkey] = 0.0
self._lastvalue = self._value # keep last value