Source code for backtrader.indicators.contrib.adx_cross_hull_style_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import (
    AverageTrueRange,
    ExponentialMovingAverage,
    Indicator,
    MinusDirectionalIndicator,
    PlusDirectionalIndicator,
    SimpleMovingAverage,
    SmoothedMovingAverage,
    WeightedMovingAverage,
)

__all__ = [
    "ADXCrossHullStyleIndicator",
    "UltraXMAIndicator",
]


def resolve_ma_class(name):
    """Map an MT5 moving-average method name to a backtrader indicator class.

    Args:
        name: MT5 MA method name (e.g. ``'sma'``, ``'ema'``, ``'jjma'``).

    Returns:
        The backtrader moving-average indicator class for the method, defaulting
        to WeightedMovingAverage for unrecognized names.
    """
    mode = str(name).lower()
    if mode in {"mode_sma", "sma"}:
        return SimpleMovingAverage
    if mode in {
        "mode_ema",
        "ema",
        "mode_jjma",
        "jjma",
        "mode_jurx",
        "jurx",
        "mode_parma",
        "parma",
        "mode_t3",
        "t3",
        "mode_vidya",
        "vidya",
        "mode_ama",
        "ama",
    }:
        return ExponentialMovingAverage
    if mode in {"mode_smma", "smma"}:
        return SmoothedMovingAverage
    return WeightedMovingAverage


def resolve_price_line(data, mode):
    """Build an applied-price line from a data feed per MT5 price modes.

    Args:
        data: The backtrader data feed providing OHLC lines.
        mode: MT5 applied-price mode (e.g. ``'price_close'``, ``'price_typical'``).

    Returns:
        A backtrader line expression for the selected applied price, defaulting
        to the close line.
    """
    price_mode = str(mode).lower()
    if price_mode in {"price_open", "open"}:
        return data.open
    if price_mode in {"price_high", "high"}:
        return data.high
    if price_mode in {"price_low", "low"}:
        return data.low
    if price_mode in {"price_median", "median"}:
        return (data.high + data.low) / 2.0
    if price_mode in {"price_typical", "typical"}:
        return (data.high + data.low + data.close) / 3.0
    if price_mode in {"price_weighted", "weighted"}:
        return (data.high + data.low + data.close + data.close) / 4.0
    if price_mode in {"price_simple", "simple"}:
        return (data.open + data.close) / 2.0
    if price_mode in {"price_quarter", "quarter"}:
        return (data.high + data.low + data.open + data.close) / 4.0
    return data.close


[docs] class ADXCrossHullStyleIndicator(Indicator): """Hull-style ADX cross indicator emitting up/down directional signal levels. Builds smoothed +DI/-DI values from a full and a half-length directional index and marks an ``up`` level on a bullish DI cross or a ``down`` level on a bearish DI cross, each offset from price by a fraction of ATR. """ lines = ("up", "down") params = (("adx_period", 14),) def __init__(self): """Construct the directional-index and ATR components and set min period.""" period = max(2, int(self.p.adx_period)) self._plus1 = PlusDirectionalIndicator(self.data, period=period) self._plus2 = PlusDirectionalIndicator(self.data, period=max(2, period // 2)) self._minus1 = MinusDirectionalIndicator(self.data, period=period) self._minus2 = MinusDirectionalIndicator(self.data, period=max(2, period // 2)) self._atr = AverageTrueRange(self.data, period=10) self.addminperiod(period + 12)
[docs] def next(self): """Compute the per-bar up/down signal levels from smoothed DI crosses.""" b4plusdi = 2.0 * float(self._plus2[-1]) - float(self._plus1[-1]) nowplusdi = 2.0 * float(self._plus2[0]) - float(self._plus1[0]) b4minusdi = 2.0 * float(self._minus2[-1]) - float(self._minus1[-1]) nowminusdi = 2.0 * float(self._minus2[0]) - float(self._minus1[0]) self.lines.up[0] = 0.0 self.lines.down[0] = 0.0 if b4plusdi < b4minusdi and nowplusdi > nowminusdi: self.lines.up[0] = float(self.data.low[0]) - 0.25 * float(self._atr[0]) if b4plusdi > b4minusdi and nowplusdi < nowminusdi: self.lines.down[0] = float(self.data.high[0]) + 0.25 * float(self._atr[0])
[docs] def once(self, start, end): """Vectorized batch computation of up/down signal levels over a range. Args: start: First index in the array range to compute. end: One past the last index in the array range to compute. """ plus1 = self._plus1.array plus2 = self._plus2.array minus1 = self._minus1.array minus2 = self._minus2.array atr = self._atr.array low = self.data.low.array high = self.data.high.array up_line = self.lines.up.array down_line = self.lines.down.array for line in (up_line, down_line): while len(line) < end: line.append(float("nan")) actual_end = min( end, len(plus1), len(plus2), len(minus1), len(minus2), len(atr), len(low), len(high) ) for i in range(start, actual_end): up_line[i] = 0.0 down_line[i] = 0.0 if i <= 0: continue b4plusdi = 2.0 * float(plus2[i - 1]) - float(plus1[i - 1]) nowplusdi = 2.0 * float(plus2[i]) - float(plus1[i]) b4minusdi = 2.0 * float(minus2[i - 1]) - float(minus1[i - 1]) nowminusdi = 2.0 * float(minus2[i]) - float(minus1[i]) if b4plusdi < b4minusdi and nowplusdi > nowminusdi: up_line[i] = float(low[i]) - 0.25 * float(atr[i]) if b4plusdi > b4minusdi and nowplusdi < nowminusdi: down_line[i] = float(high[i]) + 0.25 * float(atr[i])
[docs] class UltraXMAIndicator(Indicator): """UltraXMA breadth indicator counting rising vs falling fanned moving averages. Computes a fan of moving averages over increasing periods, counts how many are rising (bulls) versus falling (bears) each bar, and smooths those counts with an exponential factor to produce trend-strength breadth lines. """ lines = ("bulls", "bears") params = ( ("w_method", "jjma"), ("start_length", 3), ("wphase", 100), ("step", 5), ("steps_total", 10), ("smooth_method", "jjma"), ("smooth_length", 3), ("smooth_phase", 100), ("ipc", "price_close"), ) def __init__(self): """Build the fan of moving averages and breadth smoothing, set min period.""" price_line = resolve_price_line(self.data, self.p.ipc) ma_cls = resolve_ma_class(self.p.w_method) smooth_cls = resolve_ma_class(self.p.smooth_method) self._periods = [ int(self.p.start_length + i * self.p.step) for i in range(int(self.p.steps_total) + 1) ] self._ma_lines = [ma_cls(price_line, period=max(1, p)) for p in self._periods] self._bull_smooth = smooth_cls(self.lines.bulls, period=max(1, int(self.p.smooth_length))) self._bear_smooth = smooth_cls(self.lines.bears, period=max(1, int(self.p.smooth_length))) self.addminperiod(max(self._periods) + int(self.p.smooth_length) + 5)
[docs] def next(self): """Compute and smooth the per-bar bullish/bearish moving-average counts.""" upsch = 0.0 dnsch = 0.0 for ma_line in self._ma_lines: if float(ma_line[0]) > float(ma_line[-1]): upsch += 1.0 else: dnsch += 1.0 period = max(1, int(self.p.smooth_length)) alpha = 2.0 / (period + 1.0) prev_bulls = float(self.lines.bulls[-1]) if len(self) > 0 else upsch prev_bears = float(self.lines.bears[-1]) if len(self) > 0 else dnsch if prev_bulls != prev_bulls: prev_bulls = upsch if prev_bears != prev_bears: prev_bears = dnsch self.lines.bulls[0] = alpha * upsch + (1.0 - alpha) * prev_bulls if len(self) > 0 else upsch self.lines.bears[0] = alpha * dnsch + (1.0 - alpha) * prev_bears if len(self) > 0 else dnsch
[docs] def once(self, start, end): """Vectorized batch computation of smoothed breadth counts over a range. Args: start: First index in the array range to compute. end: One past the last index in the array range to compute. """ ma_arrays = [ma_line.array for ma_line in self._ma_lines] bulls_line = self.lines.bulls.array bears_line = self.lines.bears.array for line in (bulls_line, bears_line): while len(line) < end: line.append(float("nan")) period = max(1, int(self.p.smooth_length)) alpha = 2.0 / (period + 1.0) prev_bulls = None prev_bears = None actual_end = min([end] + [len(array) for array in ma_arrays]) for i in range(start, actual_end): upsch = 0.0 dnsch = 0.0 for ma_array in ma_arrays: if i > 0 and float(ma_array[i]) > float(ma_array[i - 1]): upsch += 1.0 else: dnsch += 1.0 bulls = upsch if prev_bulls is None else alpha * upsch + (1.0 - alpha) * prev_bulls bears = dnsch if prev_bears is None else alpha * dnsch + (1.0 - alpha) * prev_bears bulls_line[i] = bulls bears_line[i] = bears prev_bulls = bulls prev_bears = bears