Source code for backtrader.indicators.contrib.trend_arrows_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

import math

from .. import Indicator

__all__ = [
    "TrendArrowsIndicator",
]


[docs] class TrendArrowsIndicator(Indicator): """Reconstructs trend_arrows indicator. Computes AverageHigh (avg of highest highs over iPeriod sub-windows) and AverageLow (avg of lowest lows over iPeriod sub-windows). TrendUp = LL when close > HH; TrendDown = HH when close < LL; else continues previous trend. SignUp when TrendUp appears fresh; SignDown when TrendDown appears fresh. Buffers: 0=TrendUp, 1=TrendDown, 2=SignUp(buy), 3=SignDown(sell). """ lines = ("trend_up", "trend_down", "sign_up", "sign_down") params = ( ("iperiod", 15), ("ifullperiods", 1), ) def __init__(self): """Initialize cached period lengths for trend arrow reconstruction.""" self._ip = int(self.p.iperiod) self._ifp = int(self.p.ifullperiods) self._window = self._ip + self._ifp self.addminperiod(self._window + 2)
[docs] def next(self): """Compute trend and signal buffers for the current bar. The method: 1. Splits the lookback window into `iperiod` sub-windows and averages highs/lows. 2. Generates TrendUp/TrendDown buffers based on close relative to aggregated levels. 3. Emits SignUp/SignDown only on fresh transitions from inactive to active. """ ip = self._ip window = self._window # Compute AverageHigh: average of highest highs over ip sub-windows segment_size = max(window // ip, 1) hh_sum = 0.0 ll_sum = 0.0 count = 0 for seg in range(ip): start = seg * segment_size end = min(start + segment_size, window) if start >= len(self.data): break seg_high = -1e30 seg_low = 1e30 for k in range(start, min(end, len(self.data))): h = float(self.data.high[-k]) if k > 0 else float(self.data.high[0]) low_price = float(self.data.low[-k]) if k > 0 else float(self.data.low[0]) if h > seg_high: seg_high = h if low_price < seg_low: seg_low = low_price hh_sum += seg_high ll_sum += seg_low count += 1 hh = hh_sum / count if count else float(self.data.high[0]) ll = ll_sum / count if count else float(self.data.low[0]) close_val = float(self.data.close[0]) prev_tu = float(self.lines.trend_up[-1]) if len(self.lines.trend_up) > 1 else 0.0 prev_td = float(self.lines.trend_down[-1]) if len(self.lines.trend_down) > 1 else 0.0 if math.isnan(prev_tu): prev_tu = 0.0 if math.isnan(prev_td): prev_td = 0.0 tu = 0.0 td = 0.0 if close_val > hh: tu = ll elif close_val < ll: td = hh else: if prev_td != 0.0: td = hh if prev_tu != 0.0: tu = ll su = 0.0 sd = 0.0 if prev_tu == 0.0 and tu != 0.0: su = tu if prev_td == 0.0 and td != 0.0: sd = td self.lines.trend_up[0] = tu self.lines.trend_down[0] = td self.lines.sign_up[0] = su self.lines.sign_down[0] = sd