Source code for backtrader.indicators.contrib.t3_alarm_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import (
    ExponentialMovingAverage,
    Indicator,
    SimpleMovingAverage,
    SmoothedMovingAverage,
    WeightedMovingAverage,
)

__all__ = [
    "T3AlarmIndicator",
]


[docs] class T3AlarmIndicator(Indicator): """Double-smoothed MA slope indicator emitting direction and reversal alarms.""" lines = ("ma2", "direction", "buy_sig", "sell_sig") params = ( ("ma_period", 19), ("ma_shift", 0), ("ma_method", "ema"), ("ma_price", "close"), ) def __init__(self): """Build the twice-applied moving average and reset the prior direction.""" price = self._price_line(self.p.ma_price) ma_cls = self._ma_class(self.p.ma_method) ma1 = ma_cls(price, period=self.p.ma_period) self.lines.ma2 = ma_cls(ma1, period=self.p.ma_period) self._ma_shift = int(self.p.ma_shift) self._prev_direction = 0
[docs] def next(self): """Update the direction line and raise buy/sell alarms on slope flips.""" shift = self._ma_shift ma2_curr = self.lines.ma2[-shift] if shift > 0 else self.lines.ma2[0] ma2_prev = self.lines.ma2[-(shift + 1)] if True else self.lines.ma2[-1] if ma2_curr > ma2_prev: direction = 1 elif ma2_curr < ma2_prev: direction = -1 else: direction = self._prev_direction prev_dir = self._prev_direction self._prev_direction = direction self.lines.direction[0] = float(direction) self.lines.buy_sig[0] = 1.0 if (direction == 1 and prev_dir == -1) else 0.0 self.lines.sell_sig[0] = 1.0 if (direction == -1 and prev_dir == 1) else 0.0
[docs] def once(self, start, end): """Vectorised batch evaluation of direction and alarm lines. Args: start: Inclusive start index of the range to fill. end: Exclusive end index of the range to fill. """ ma2 = self.lines.ma2.array direction_line = self.lines.direction.array buy_line = self.lines.buy_sig.array sell_line = self.lines.sell_sig.array for line in (direction_line, buy_line, sell_line): while len(line) < end: line.append(float("nan")) shift = self._ma_shift prev_direction = 0 actual_end = min(end, len(ma2)) for i in range(start, actual_end): curr_idx = i - shift if shift > 0 else i prev_idx = i - shift - 1 if curr_idx < 0 or prev_idx < 0: direction = prev_direction else: ma2_curr = ma2[curr_idx] ma2_prev = ma2[prev_idx] if ma2_curr > ma2_prev: direction = 1 elif ma2_curr < ma2_prev: direction = -1 else: direction = prev_direction prev_dir = prev_direction prev_direction = direction direction_line[i] = float(direction) buy_line[i] = 1.0 if (direction == 1 and prev_dir == -1) else 0.0 sell_line[i] = 1.0 if (direction == -1 and prev_dir == 1) else 0.0 self._prev_direction = prev_direction
def _ma_class(self, method): name = str(method).lower() mapping = { "sma": SimpleMovingAverage, "ema": ExponentialMovingAverage, "smma": SmoothedMovingAverage, "lwma": WeightedMovingAverage, "wma": WeightedMovingAverage, } return mapping.get(name, ExponentialMovingAverage) def _price_line(self, price_name): name = str(price_name).lower() if name == "open": return self.data.open if name == "high": return self.data.high if name == "low": return self.data.low if name == "median": return (self.data.high + self.data.low) / 2.0 if name == "typical": return (self.data.high + self.data.low + self.data.close) / 3.0 if name == "weighted": return (self.data.high + self.data.low + self.data.close + self.data.close) / 4.0 return self.data.close