Source code for backtrader.indicators.contrib.step_manrtr_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import Indicator

__all__ = [
    "StepMANRTRIndicator",
]


[docs] class StepMANRTRIndicator(Indicator): """Reconstructs StepMA_NRTR indicator. StepSizeCalc: Volty-based step size from ATR-like calculation. StepMACalc: Trend-following MA with NRTR ratchet. 4 buffers: UpBuffer(0), DnBuffer(1), BuySignal(2), SellSignal(3). Buy when trend flips from down to up. Sell on reverse. """ lines = ("trend_up", "trend_down", "buy_signal", "sell_signal") params = ( ("length", 10), ("kv", 1.0), ("step_size", 0), ("percentage", 0), ("switch", 1), ) def __init__(self): """Initialize rolling state and derived parameters for indicator updates.""" self._length = int(self.p.length) self._kv = float(self.p.kv) self._step_size = int(self.p.step_size) self._percentage = float(self.p.percentage) self._switch = int(self.p.switch) # 0=Close, 1=HighLow self._trend0 = 0 self._trend1 = 0 self._trend1_ = 0 self._smax1 = 0.0 self._smin1 = 0.0 self._first = True self.addminperiod(self._length + 3) def _step_size_calc(self, bar_idx): length = self._length kv = self._kv if self._step_size > 0: return self._step_size # Volty calculation: average of high-low ranges total = 0.0 for i in range(length): h = float(self.data.high[-i]) low_price = float(self.data.low[-i]) total += h - low_price avg = total / length if length > 0 else 0 # Convert to points-like value step = avg * kv / self.data.close[0] * 10000 if self.data.close[0] != 0 else 0 return max(step, 1) def _step_ma_calc(self): step = self._step_size_calc(0) point = float(self.data.close[0]) / 10000.0 if float(self.data.close[0]) > 10 else 0.0001 size_p = step * point size_2p = size_p * 2 cur_high = float(self.data.high[0]) cur_low = float(self.data.low[0]) cur_close = float(self.data.close[0]) if self._first: self._trend1 = 0 self._smax1 = cur_low + size_2p self._smin1 = cur_high - size_2p self._first = False if self._switch: # HighLow mode smax0 = cur_high - size_2p smin0 = cur_low + size_2p else: smax0 = cur_close + size_2p smin0 = cur_close - size_2p self._trend0 = self._trend1 if cur_close > self._smax1: self._trend0 = 1 if cur_close < self._smin1: self._trend0 = -1 if self._trend0 > 0: if smin0 < self._smin1: smin0 = self._smin1 result = smin0 + size_p else: if smax0 > self._smax1: smax0 = self._smax1 result = smax0 - size_p self._trend1_ = self._trend1 self._smax1 = smax0 self._smin1 = smin0 self._trend1 = self._trend0 return result, size_p
[docs] def next(self): """Compute trend-up/down lines and buy/sell trigger levels for this bar.""" result, size_p = self._step_ma_calc() ratio = self._percentage / 100.0 if self._percentage > 0 else 0 step = self._step_size_calc(0) if step > 0: result += ratio / step tu = 0.0 td = 0.0 bs = 0.0 ss = 0.0 point = float(self.data.close[0]) / 10000.0 if float(self.data.close[0]) > 10 else 0.0001 if self._trend0 > 0: tu = result - step * point if self._trend1_ < 0: bs = tu if self._trend0 < 0: td = result + step * point if self._trend1_ > 0: ss = td self.lines.trend_up[0] = tu self.lines.trend_down[0] = td self.lines.buy_signal[0] = bs self.lines.sell_signal[0] = ss