Source code for backtrader.indicators.contrib.shared_strategy_indicators

#!/usr/bin/env python
"""Repeated functional-test indicators migrated to contrib.

These indicators are re-exported by ``backtrader.indicators`` and are
therefore available as ``Xxx``. Some historical same-name test
classes had incompatible line contracts; those variants use explicit names.
"""

import math
from collections import deque

from .. import (
    EMA,
    AverageDirectionalMovementIndex,
    AverageTrueRange,
    ExponentialMovingAverage,
    Highest,
    If,
    Indicator,
    Lowest,
    MinusDirectionalIndicator,
    ParabolicSAR,
    PlusDirectionalIndicator,
    SimpleMovingAverage,
    SmoothedMovingAverage,
    StandardDeviation,
    StochasticFull,
    WeightedMovingAverage,
)

__all__ = [
    "SkyscraperFixIndicator",
    "SkyscraperFixDuplexIndicator",
    "SkyscraperFixColorAMLIndicator",
    "AppliedPriceCCI",
    "ColorAMLIndicator",
    "ColorAMLMeanReversionIndicator",
    "X2MACandleApprox",
    "XPeriodCandleColor",
    "XPeriodCandleSystemColor",
    "AcceleratorOscillator",
    "AIAcceleratorOscillator",
    "AdaptiveMarketLevel",
    "AmlIndicator",
    "FunctionalAwesomeOscillator",
    "AIAwesomeOscillator",
    "BlauErgodicMDI",
    "BlauErgodicMDIClassic",
    "BrakeExpIndicator",
    "FlatTrendIndicator",
    "FlatTrendDistanceIndicator",
    "IinMASignalIndicator",
    "KDJ",
    "LaguerreIndicator",
    "LaguerreColorIndicator",
    "RelativeVigorIndex",
    "SmoothedRelativeVigorIndex",
    "SafeCCI",
    "SafeCCIWithFactor",
    "SilverTrendSignalProxy",
    "SilverTrendDirectionSignalProxy",
]


def _price_series(data, mode):
    key = str(mode).lower()
    if key in ("1", "close", "price_close"):
        return data.close
    if key in ("2", "open", "price_open"):
        return data.open
    if key in ("3", "high", "price_high"):
        return data.high
    if key in ("4", "low", "price_low"):
        return data.low
    if key in ("5", "median", "price_median"):
        return (data.high + data.low) / 2.0
    if key in ("6", "typical", "price_typical"):
        return (data.high + data.low + data.close) / 3.0
    if key in ("7", "weighted", "price_weighted"):
        return (data.high + data.low + data.close + data.close) / 4.0
    if key in ("8", "simple", "price_simpl"):
        return (data.open + data.close) / 2.0
    if key in ("9", "quarter", "price_quarter"):
        return (data.high + data.low + data.open + data.close) / 4.0
    return data.close


def resolve_ma_class(name):
    mode = str(name).lower()
    if mode in {"mode_sma", "sma"}:
        return SimpleMovingAverage
    if mode in {
        "mode_ema",
        "ema",
        "mode_jjma",
        "jjma",
        "mode_jurx",
        "jurx",
        "mode_parma",
        "parma",
        "mode_t3",
        "t3",
        "mode_vidya",
        "vidya",
        "mode_ama",
        "ama",
    }:
        return ExponentialMovingAverage
    if mode in {"mode_smma", "smma"}:
        return SmoothedMovingAverage
    return WeightedMovingAverage


[docs] class SkyscraperFixIndicator(Indicator): """Channel-like adaptive indicator emitting buy/sell buffers and color state.""" lines = ("up_buffer", "dn_buffer", "buy_buffer", "sell_buffer", "color_state") params = ( ("length", 10), ("kv", 0.9), ("percentage", 0.0), ("use_high_low", True), ("atr_period", 15), ("point_size", 0.01), ) def __init__(self): """Initialize internal ATR state and channel persistence variables.""" self.addminperiod(max(self.p.length, self.p.atr_period) + 3) self.atr = AverageTrueRange(self.data, period=self.p.atr_period) self.atr_high = Highest(self.atr, period=self.p.length) self.atr_low = Lowest(self.atr, period=self.p.length) self._prev_smin = None self._prev_smax = None self._prev_trend = 0 @staticmethod def _nan(): return float("nan") @staticmethod def _valid(value): return value is not None and math.isfinite(value)
[docs] def next(self): """Calculate up/down channel levels, pending buffers, and current color.""" up = self._nan() dn = self._nan() buy = self._nan() sell = self._nan() color = ( self.lines.color_state[-1] if len(self) > 1 and math.isfinite(self.lines.color_state[-1]) else 1.0 ) if self._prev_smin is None: close = float(self.data.close[0]) self._prev_smin = close self._prev_smax = close self._prev_trend = 0 self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color return atrmax = float(self.atr_high[0]) atrmin = float(self.atr_low[0]) if not math.isfinite(atrmax) or not math.isfinite(atrmin): self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color return step = int(0.5 * self.p.kv * (atrmax + atrmin) / self.p.point_size) xstep = step * self.p.point_size x2step = 2.0 * xstep close = float(self.data.close[0]) high = float(self.data.high[0]) low = float(self.data.low[0]) if self.p.use_high_low: smax0 = low + x2step smin0 = high - x2step else: smax0 = close + x2step smin0 = close - x2step trend0 = self._prev_trend if close > self._prev_smax: trend0 = 1 if close < self._prev_smin: trend0 = -1 if trend0 > 0: smin0 = max(smin0, self._prev_smin) up = smin0 color = 0.0 else: smax0 = min(smax0, self._prev_smax) dn = smax0 color = 1.0 prev_up = self.lines.up_buffer[-1] if len(self) > 1 else self._nan() prev_dn = self.lines.dn_buffer[-1] if len(self) > 1 else self._nan() if self._valid(prev_dn) and self._valid(up): buy = up if self._valid(prev_up) and self._valid(dn): sell = dn self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color self._prev_smin = smin0 self._prev_smax = smax0 self._prev_trend = trend0
[docs] class SkyscraperFixDuplexIndicator(Indicator): """Indicator for directional buffer-based skyline reversals.""" lines = ("up_buffer", "dn_buffer", "buy_buffer", "sell_buffer") params = ( ("length", 10), ("kv", 0.9), ("percentage", 0.0), ("use_high_low", True), ("atr_period", 15), ("point_size", 0.01), ) def __init__(self): """Build ATR state and initialize rolling trend context.""" self.addminperiod(max(self.p.length, self.p.atr_period) + 2) self.atr = AverageTrueRange(self.data, period=self.p.atr_period) self.atr_high = Highest(self.atr, period=self.p.length) self.atr_low = Lowest(self.atr, period=self.p.length) self._prev_smin = None self._prev_smax = None self._prev_trend = 0 @staticmethod def _nan(): return float("nan") @staticmethod def _valid(value): return value is not None and math.isfinite(value)
[docs] def next(self): """Update the skyscraper buffers for the current bar.""" up = self._nan() dn = self._nan() buy = self._nan() sell = self._nan() if self._prev_smin is None: self._prev_smin = float(self.data.close[0]) self._prev_smax = float(self.data.close[0]) self._prev_trend = 0 self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell return atrmax = float(self.atr_high[0]) atrmin = float(self.atr_low[0]) if not math.isfinite(atrmax) or not math.isfinite(atrmin): self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell return step = int(0.5 * self.p.kv * (atrmax + atrmin) / self.p.point_size) xstep = step * self.p.point_size x2step = 2.0 * xstep close = float(self.data.close[0]) high = float(self.data.high[0]) low = float(self.data.low[0]) if self.p.use_high_low: smax0 = low + x2step smin0 = high - x2step else: smax0 = close + x2step smin0 = close - x2step trend0 = self._prev_trend if close > self._prev_smax: trend0 = 1 if close < self._prev_smin: trend0 = -1 if trend0 > 0: smin0 = max(smin0, self._prev_smin) up = smin0 else: smax0 = min(smax0, self._prev_smax) dn = smax0 prev_up = self.lines.up_buffer[-1] if len(self) > 1 else self._nan() prev_dn = self.lines.dn_buffer[-1] if len(self) > 1 else self._nan() if self._valid(prev_dn) and self._valid(up): buy = up if self._valid(prev_up) and self._valid(dn): sell = dn self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self._prev_smin = smin0 self._prev_smax = smax0 self._prev_trend = trend0
[docs] class SkyscraperFixColorAMLIndicator(Indicator): """Skyscraper fix channel indicator producing buy/sell buffers and color state.""" lines = ("up_buffer", "dn_buffer", "buy_buffer", "sell_buffer", "color_state") params = ( ("length", 10), ("kv", 0.9), ("percentage", 0.0), ("use_high_low", True), ("atr_period", 15), ("point_size", 0.01), ) def __init__(self): """Initialize ATR-derived channel state and lookback counters.""" self.addminperiod(max(self.p.length, self.p.atr_period) + 3) self.atr = AverageTrueRange(self.data, period=self.p.atr_period) self.atr_high = Highest(self.atr, period=self.p.length) self.atr_low = Lowest(self.atr, period=self.p.length) self._prev_smin = None self._prev_smax = None self._prev_trend = 0 @staticmethod def _nan(): return float("nan") @staticmethod def _valid(value): return value is not None and math.isfinite(value)
[docs] def next(self): """Compute current channel extrema and potential reversal buffers.""" up = self._nan() dn = self._nan() buy = self._nan() sell = self._nan() color = ( self.lines.color_state[-1] if len(self) > 1 and math.isfinite(self.lines.color_state[-1]) else 1.0 ) if self._prev_smin is None: close = float(self.data.close[0]) self._prev_smin = close self._prev_smax = close self._prev_trend = 0 self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color return atrmax = float(self.atr_high[0]) atrmin = float(self.atr_low[0]) if not math.isfinite(atrmax) or not math.isfinite(atrmin): self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color return step = int(0.5 * self.p.kv * (atrmax + atrmin) / self.p.point_size) x2step = 2.0 * step * self.p.point_size close = float(self.data.close[0]) high = float(self.data.high[0]) low = float(self.data.low[0]) if self.p.use_high_low: smax0 = low + x2step smin0 = high - x2step else: smax0 = close + x2step smin0 = close - x2step trend0 = self._prev_trend if close > self._prev_smax: trend0 = 1 if close < self._prev_smin: trend0 = -1 if trend0 > 0: smin0 = max(smin0, self._prev_smin) up = smin0 color = 0.0 else: smax0 = min(smax0, self._prev_smax) dn = smax0 color = 1.0 prev_up = self.lines.up_buffer[-1] if len(self) > 1 else self._nan() prev_dn = self.lines.dn_buffer[-1] if len(self) > 1 else self._nan() if self._valid(prev_dn) and self._valid(up): buy = up if self._valid(prev_up) and self._valid(dn): sell = dn self.lines.up_buffer[0] = up self.lines.dn_buffer[0] = dn self.lines.buy_buffer[0] = buy self.lines.sell_buffer[0] = sell self.lines.color_state[0] = color self._prev_smin = smin0 self._prev_smax = smax0 self._prev_trend = trend0
[docs] class AppliedPriceCCI(Indicator): """Commodity Channel Index computed on an arbitrary applied-price line.""" lines = ("cci",) params = ( ("period", 14), ("factor", 0.015), ) def __init__(self): """Set the warm-up period to one bar beyond the CCI lookback.""" self.addminperiod(int(self.p.period) + 1)
[docs] def next(self): """Emit the CCI value: price deviation from its mean over mean abs deviation.""" period = int(self.p.period) prices = [float(self.data[-i]) for i in range(period)] mean_price = sum(prices) / period mean_dev = sum(abs(price - mean_price) for price in prices) / period denom = float(self.p.factor) * mean_dev if denom == 0: self.lines.cci[0] = 0.0 return self.lines.cci[0] = (float(self.data[0]) - mean_price) / denom
[docs] class ColorAMLIndicator(Indicator): """Fractal-driven adaptive moving line with trend color transitions.""" lines = ("aml", "color_state") params = ( ("fractal", 6), ("lag", 7), ("shift", 0), ("point_size", 0.01), ) def __init__(self): """Initialize smoothing buffers and state for AML computation.""" self.addminperiod(2 * self.p.fractal + self.p.lag + 5) self._smooth_history = [] self._prev_aml = None self._prev_color = 1.0 @staticmethod def _window_max(line, start_ago, size): values = [float(line[-(start_ago + idx)]) for idx in range(size)] return max(values) @staticmethod def _window_min(line, start_ago, size): values = [float(line[-(start_ago + idx)]) for idx in range(size)] return min(values)
[docs] def next(self): """Update AML line value and color state for the current candle.""" if len(self.data) < 2 * self.p.fractal + self.p.lag + 2: self.lines.aml[0] = float("nan") self.lines.color_state[0] = self._prev_color return r1 = ( self._window_max(self.data.high, 0, self.p.fractal) - self._window_min(self.data.low, 0, self.p.fractal) ) / float(self.p.fractal) r2 = ( self._window_max(self.data.high, self.p.fractal, self.p.fractal) - self._window_min(self.data.low, self.p.fractal, self.p.fractal) ) / float(self.p.fractal) r3 = ( self._window_max(self.data.high, 0, 2 * self.p.fractal) - self._window_min(self.data.low, 0, 2 * self.p.fractal) ) / float(2 * self.p.fractal) dim = 0.0 if r1 + r2 > 0 and r3 > 0: dim = (math.log(r1 + r2) - math.log(r3)) * 1.44269504088896 alpha = math.exp(-self.p.lag * (dim - 1.0)) alpha = min(alpha, 1.0) alpha = max(alpha, 0.01) price = ( float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.open[0]) + 2.0 * float(self.data.close[0]) ) / 6.0 prev_smooth = self._smooth_history[-1] if self._smooth_history else price smooth = alpha * price + (1.0 - alpha) * prev_smooth self._smooth_history.append(smooth) prev_aml = self._prev_aml if self._prev_aml is not None else smooth lag_smooth = ( self._smooth_history[-(self.p.lag + 1)] if len(self._smooth_history) > self.p.lag else smooth ) if abs(smooth - lag_smooth) >= self.p.lag * self.p.lag * self.p.point_size: aml = smooth else: aml = prev_aml color = self._prev_color if aml > prev_aml: color = 2.0 if aml < prev_aml: color = 0.0 self.lines.aml[0] = aml self.lines.color_state[0] = color self._prev_aml = aml self._prev_color = color
[docs] class ColorAMLMeanReversionIndicator(Indicator): """Adaptive moving-lowpass indicator for color-state trend capture.""" lines = ("aml", "color_state") params = ( ("fractal", 6), ("lag", 7), ("shift", 0), ("point_size", 0.01), ) def __init__(self): """Prepare smoothing buffers and history for AML color-state generation.""" self.addminperiod(2 * self.p.fractal + self.p.lag + 5) self._smooth_history = [] self._prev_aml = None self._prev_color = 1.0 @staticmethod def _window_max(line, start_ago, size): values = [float(line[-(start_ago + idx)]) for idx in range(size)] return max(values) @staticmethod def _window_min(line, start_ago, size): values = [float(line[-(start_ago + idx)]) for idx in range(size)] return min(values)
[docs] def next(self): """Update smooth and color values for the active bar.""" if len(self.data) < 2 * self.p.fractal + self.p.lag + 2: self.lines.aml[0] = float("nan") self.lines.color_state[0] = self._prev_color return r1 = ( self._window_max(self.data.high, 0, self.p.fractal) - self._window_min(self.data.low, 0, self.p.fractal) ) / float(self.p.fractal) r2 = ( self._window_max(self.data.high, self.p.fractal, self.p.fractal) - self._window_min(self.data.low, self.p.fractal, self.p.fractal) ) / float(self.p.fractal) r3 = ( self._window_max(self.data.high, 0, 2 * self.p.fractal) - self._window_min(self.data.low, 0, 2 * self.p.fractal) ) / float(2 * self.p.fractal) dim = 0.0 if r1 + r2 > 0 and r3 > 0: dim = (math.log(r1 + r2) - math.log(r3)) * 1.44269504088896 alpha = math.exp(-self.p.lag * (dim - 1.0)) alpha = min(alpha, 1.0) alpha = max(alpha, 0.01) price = ( float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.open[0]) + 2.0 * float(self.data.close[0]) ) / 6.0 prev_smooth = self._smooth_history[-1] if self._smooth_history else price smooth = alpha * price + (1.0 - alpha) * prev_smooth self._smooth_history.append(smooth) prev_aml = self._prev_aml if self._prev_aml is not None else smooth lag_smooth = ( self._smooth_history[-(self.p.lag + 1)] if len(self._smooth_history) > self.p.lag else smooth ) aml = ( smooth if abs(smooth - lag_smooth) >= self.p.lag * self.p.lag * self.p.point_size else prev_aml ) color = self._prev_color if aml > prev_aml: color = 2.0 if aml < prev_aml: color = 0.0 self.lines.aml[0] = aml self.lines.color_state[0] = color self._prev_aml = aml self._prev_color = color
[docs] class X2MACandleApprox(Indicator): """Two-stage moving approximation of candle structure and color.""" lines = ("open_value", "high_value", "low_value", "close_value", "color_state") params = ( ("length1", 12), ("phase1", 15), ("length2", 5), ("phase2", 15), ("gap", 10.0), ) def __init__(self): """Initialize rolling queues and two-stage smoothing states.""" self._length1 = max(1, int(self.p.length1)) self._length2 = max(2, int(self.p.length2)) self._phase2 = max(-100, min(100, int(self.p.phase2))) base_alpha = 2.0 / (self._length2 + 1.0) self._alpha = max(0.01, min(0.95, base_alpha * (1.0 + self._phase2 / 200.0))) self._phase_gain = self._phase2 / 200.0 self._queues = { "open": deque(maxlen=self._length1), "high": deque(maxlen=self._length1), "low": deque(maxlen=self._length1), "close": deque(maxlen=self._length1), } self._states = { "open": {"ema1": None, "ema2": None}, "high": {"ema1": None, "ema2": None}, "low": {"ema1": None, "ema2": None}, "close": {"ema1": None, "ema2": None}, } self.addminperiod(self._length1 + self._length2) @staticmethod def _finite(value): return value is not None and math.isfinite(value) def _sma(self, key, value): queue = self._queues[key] queue.append(float(value)) if len(queue) < self._length1: return None return sum(queue) / len(queue) def _smooth(self, key, value): state = self._states[key] if state["ema1"] is None: state["ema1"] = value state["ema2"] = value else: state["ema1"] = state["ema1"] + self._alpha * (value - state["ema1"]) state["ema2"] = state["ema2"] + self._alpha * (state["ema1"] - state["ema2"]) return state["ema1"] + self._phase_gain * (state["ema1"] - state["ema2"]) def _stage_value(self, key, line): sma_value = self._sma(key, line[0]) if sma_value is None: return None return self._smooth(key, sma_value)
[docs] def next(self): """Update approximated open/high/low/close and color from historical window.""" open_value = self._stage_value("open", self.data.open) high_value = self._stage_value("high", self.data.high) low_value = self._stage_value("low", self.data.low) close_value = self._stage_value("close", self.data.close) if not all(self._finite(v) for v in (open_value, high_value, low_value, close_value)): self.lines.open_value[0] = float("nan") self.lines.high_value[0] = float("nan") self.lines.low_value[0] = float("nan") self.lines.close_value[0] = float("nan") self.lines.color_state[0] = float("nan") return max_value = max(open_value, close_value, high_value, low_value) min_value = min(open_value, close_value, high_value, low_value) adjusted_open = open_value if len(self) > 1 and abs(float(self.data.open[0]) - float(self.data.close[0])) <= float( self.p.gap ): prev_close = float(self.lines.close_value[-1]) if self._finite(prev_close): adjusted_open = prev_close color_state = ( 2.0 if adjusted_open < close_value else 0.0 if adjusted_open > close_value else 1.0 ) self.lines.open_value[0] = adjusted_open self.lines.high_value[0] = max_value self.lines.low_value[0] = min_value self.lines.close_value[0] = close_value self.lines.color_state[0] = color_state
[docs] class XPeriodCandleColor(Indicator): """Smoothing-based period-candle indicator producing color index.""" lines = ("color_idx", "xopen", "xclose", "xhigh", "xlow") params = ( ("cperiod", 5), ("ma_length", 3), ) def __init__(self): """Build smoothed OHLC components and set minimum bars.""" self.smooth_open = SimpleMovingAverage(self.data.open, period=self.p.ma_length) self.smooth_high = SimpleMovingAverage(self.data.high, period=self.p.ma_length) self.smooth_low = SimpleMovingAverage(self.data.low, period=self.p.ma_length) self.smooth_close = SimpleMovingAverage(self.data.close, period=self.p.ma_length) self.addminperiod(self.p.ma_length + self.p.cperiod)
[docs] def next(self): """Compute synthetic candle and color value for the current bar.""" lookback = max(1, int(self.p.cperiod)) start = -(lookback - 1) xopen = float(self.smooth_open[start]) xclose = float(self.smooth_close[0]) highs = [float(self.smooth_high[-i]) for i in range(lookback)] lows = [float(self.smooth_low[-i]) for i in range(lookback)] self.lines.xopen[0] = xopen self.lines.xclose[0] = xclose self.lines.xhigh[0] = max(highs) self.lines.xlow[0] = min(lows) self.lines.color_idx[0] = 0.0 if xopen <= xclose else 2.0
[docs] class XPeriodCandleSystemColor(Indicator): """SMA-smoothed candle color indicator with Bollinger Band breakout detection.""" lines = ("color_idx", "upper", "lower", "xopen", "xclose") params = ( ("period", 5), ("bb_length", 20), ("bands_deviation", 1.001), ) def __init__(self): """Initialize SMA smoothing of OHLC and Bollinger Band components.""" self.smooth_open = SimpleMovingAverage(self.data.open, period=self.p.period) self.smooth_high = SimpleMovingAverage(self.data.high, period=self.p.period) self.smooth_low = SimpleMovingAverage(self.data.low, period=self.p.period) self.smooth_close = SimpleMovingAverage(self.data.close, period=self.p.period) self.mid = SimpleMovingAverage(self.smooth_close, period=self.p.bb_length) self.std = StandardDeviation(self.smooth_close, period=self.p.bb_length)
[docs] def next(self): """Assign color index based on smoothed candle direction and Bollinger Band position.""" xopen = float(self.smooth_open[0]) xclose = float(self.smooth_close[0]) upper = float(self.mid[0] + self.std[0] * self.p.bands_deviation) lower = float(self.mid[0] - self.std[0] * self.p.bands_deviation) color = 2.0 if xopen <= xclose: color = 1.0 elif xopen > xclose: color = 3.0 if xopen <= xclose and xclose > upper: color = 0.0 if xopen > xclose and xclose < lower: color = 4.0 self.lines.xopen[0] = xopen self.lines.xclose[0] = xclose self.lines.upper[0] = upper self.lines.lower[0] = lower self.lines.color_idx[0] = color
[docs] class AcceleratorOscillator(Indicator): """Compute accelerator oscillator using short and long SMA of median price.""" lines = ("ac",) params = () def __init__(self): """Build the oscillator line from medians and SMA smoothing.""" median = (self.data.high + self.data.low) / 2.0 ao = SimpleMovingAverage(median, period=5) - SimpleMovingAverage(median, period=34) self.lines.ac = ao - SimpleMovingAverage(ao, period=5)
[docs] class AIAcceleratorOscillator(Indicator): """Accelerator Oscillator indicator computed from Awesome Oscillator.""" lines = ("ac",) def __init__(self): """Create an AO smoothed by a 5-period SMA.""" ao = AIAwesomeOscillator(self.data) ao_sma = SimpleMovingAverage(ao.ao, period=5) self.lines.ac = ao.ao - ao_sma
[docs] class AdaptiveMarketLevel(Indicator): """Adaptive Market Level indicator using fractal dimension and adaptive smoothing. The AML line adapts its smoothing alpha based on the measured fractal dimension of the price range, providing faster response in trending markets and slower response in mean-reverting regimes. """ lines = ("aml",) params = ( ("fractal", 70), ("lag", 18), ("shift", 0), ("point", 0.01), ) def __init__(self): """Initialize history deques and minimum period for the indicator.""" self._smooth_history = [] self._aml_history = [] self._min_period = max(int(self.p.fractal) * 2 + int(self.p.lag), 1) def _range(self, count, start): highs = [] lows = [] for idx in range(start, start + count): ago = -idx if idx else 0 highs.append(float(self.data.high[ago])) lows.append(float(self.data.low[ago])) return max(highs) - min(lows)
[docs] def next(self): """Compute AML value using fractal-range adaptive smoothing.""" fractal = int(self.p.fractal) lag = int(self.p.lag) if len(self.data) < self._min_period: self.lines.aml[0] = float(self.data.close[0]) return r1 = self._range(fractal, 0) / fractal r2 = self._range(fractal, fractal) / fractal r3 = self._range(fractal * 2, 0) / (fractal * 2) dim = 0.0 if r1 + r2 > 0 and r3 > 0: dim = (math.log(r1 + r2) - math.log(r3)) * 1.44269504088896 alpha = math.exp(-lag * (dim - 1.0)) alpha = min(max(alpha, 0.01), 1.0) price = ( float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.open[0]) + 2.0 * float(self.data.close[0]) ) / 6.0 prev_smooth = self._smooth_history[-1] if self._smooth_history else 0.0 smooth = alpha * price + (1.0 - alpha) * prev_smooth lagged_smooth = self._smooth_history[-lag] if len(self._smooth_history) >= lag else 0.0 prev_aml = self._aml_history[-1] if self._aml_history else smooth threshold = lag * lag * float(self.p.point) aml = smooth if abs(smooth - lagged_smooth) >= threshold else prev_aml self._smooth_history.append(smooth) self._aml_history.append(aml) self.lines.aml[0] = aml
[docs] class AmlIndicator(Indicator): """Adaptive Market Level indicator for backtrader (on-chart version). Uses the same fractal-range adaptive smoothing logic as AdaptiveMarketLevel but implemented as a Backtrader indicator with minperiod management. """ lines = ("aml",) params = ( ("fractal", 70), ("lag", 18), ("shift", 0), ("point", 0.01), ) def __init__(self): """Initialize smoothing deque and minperiod based on fractal and lag.""" lag = max(1, int(self.p.lag)) fractal = max(1, int(self.p.fractal)) self._smooth = deque(maxlen=lag + 1) self.addminperiod(max(fractal * 2 + 2, lag + 2)) def _range(self, start, count): highs = [float(self.data.high[-(start + i)]) for i in range(count)] lows = [float(self.data.low[-(start + i)]) for i in range(count)] return max(highs) - min(lows)
[docs] def next(self): """Compute AML value using fractal-range adaptive smoothing.""" fractal = max(1, int(self.p.fractal)) lag = max(1, int(self.p.lag)) price = ( float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.open[0]) + 2.0 * float(self.data.close[0]) ) / 6.0 if len(self.data) < fractal * 2 + 1: self._smooth.append(price) self.lines.aml[0] = float(self.lines.aml[-1]) if len(self) > 1 else price return r1 = self._range(0, fractal) / fractal r2 = self._range(fractal, fractal) / fractal r3 = self._range(0, fractal * 2) / (fractal * 2) dim = 0.0 if r1 + r2 > 0 and r3 > 0: dim = (math.log(r1 + r2) - math.log(r3)) / math.log(2.0) alpha = math.exp(-lag * (dim - 1.0)) alpha = min(1.0, max(0.01, alpha)) prev_smooth = self._smooth[-1] if self._smooth else 0.0 smooth = alpha * price + (1.0 - alpha) * prev_smooth lagged_smooth = self._smooth[0] if len(self._smooth) == self._smooth.maxlen else 0.0 self._smooth.append(smooth) if abs(smooth - lagged_smooth) >= lag * lag * float(self.p.point): self.lines.aml[0] = smooth else: self.lines.aml[0] = float(self.lines.aml[-1]) if len(self) > 1 else smooth
[docs] class FunctionalAwesomeOscillator(Indicator): """Awesome Oscillator: fast minus slow SMA of the median price.""" lines = ("ao",) params = ( ("fast", 5), ("slow", 34), ) def __init__(self): """Build the fast and slow median-price moving averages.""" median_price = (self.data.high + self.data.low) / 2.0 self._fast = SimpleMovingAverage(median_price, period=self.p.fast) self._slow = SimpleMovingAverage(median_price, period=self.p.slow)
[docs] def next(self): """Emit the fast/slow SMA difference for the current bar.""" self.lines.ao[0] = float(self._fast[0]) - float(self._slow[0])
[docs] class AIAwesomeOscillator(Indicator): """Awesome Oscillator indicator using two SMAs on the price midpoint.""" lines = ("ao",) params = ( ("fast", 5), ("slow", 34), ) def __init__(self): """Create fast and slow moving averages of the midpoint.""" median = (self.data.high + self.data.low) / 2.0 fast_ma = SimpleMovingAverage(median, period=self.p.fast) slow_ma = SimpleMovingAverage(median, period=self.p.slow) self.lines.ao = fast_ma - slow_ma
[docs] class BlauErgodicMDI(Indicator): """Calculate layered EMA histograms used as the Blau Ergodic MDI signal.""" lines = ("up", "dn", "hist", "color_idx") params = ( ("xlength", 20), ("xlength1", 5), ("xlength2", 5), ("xlength3", 5), ) def __init__(self): """Initialize recursive EMA stages and expose indicator lines.""" price = ExponentialMovingAverage(self.data.close, period=max(2, self.p.xlength)) xprice = ExponentialMovingAverage(price, period=max(2, self.p.xlength1)) dif = price - xprice xdif = ExponentialMovingAverage(dif, period=max(2, self.p.xlength1)) xxdif = ExponentialMovingAverage(xdif, period=max(2, self.p.xlength2)) xxxdif = ExponentialMovingAverage(xxdif, period=max(2, self.p.xlength3)) self.lines.hist = xxdif self.lines.up = xxdif self.lines.dn = xxxdif self.addminperiod(self.p.xlength + self.p.xlength1 + self.p.xlength2 + self.p.xlength3 + 2)
[docs] class BlauErgodicMDIClassic(Indicator): """Ergodic MDI indicator with up/down/histogram smoothing channels.""" lines = ("up", "down", "hist") params = ( ("xlength", 20), ("xlength1", 5), ("xlength2", 3), ("xlength3", 8), ("ipc", "close"), ) def __init__(self): """Build normalized price deviation and EMA-smoothed up/down/histogram lines.""" price = _price_series(self.data, self.p.ipc) xprice = EMA(price, period=int(self.p.xlength)) dif = (price - xprice) / 0.01 xdif = EMA(dif, period=int(self.p.xlength1)) xxdif = EMA(xdif, period=int(self.p.xlength2)) xxxdif = EMA(xxdif, period=int(self.p.xlength3)) self.l.hist = xxdif self.l.up = xxdif self.l.down = xxxdif
[docs] class BrakeExpIndicator(Indicator): """Exponential trailing-stop indicator with trend and flip lines. Maintains an exponential-curve stop that rises while long (and falls while short) from a begin price; when price breaks the stop the direction flips. Exposes the active stop on ``up_trend``/``down_trend`` lines and direction-flip cues on ``buy_signal``/``sell_signal`` lines. """ lines = ("up_trend", "down_trend", "buy_signal", "sell_signal") params = ( ("a", 3.0), ("b", 1.0), ) def __init__(self): """Set the minimum period and initialize the exponential-stop state.""" self.addminperiod(5) self._is_long = True self._max_price = float("-inf") self._min_price = float("inf") self._begin_bar = 0 self._begin_price = None
[docs] def next(self): """Advance the exponential stop and emit trend and flip lines. Extends the stop along the exponential curve, flips direction (resetting the begin price and extremes) when price breaks the stop, and sets the ``up_trend``/``down_trend`` lines plus ``buy_signal``/``sell_signal`` flip cues for the bar. """ if self._begin_price is None: self._begin_price = float(self.data.low[0]) self._max_price = max(self._max_price, float(self.data.high[0])) self._min_price = min(self._min_price, float(self.data.low[0])) bars_since_begin = max(0, len(self.data) - 1 - self._begin_bar) a = float(self.p.a) * 0.1 b = float(self.p.b) * 0.00001 exp_val = (math.exp(bars_since_begin * a) - 1.0) * b value = self._begin_price + exp_val if self._is_long else self._begin_price - exp_val if self._is_long and value > float(self.data.low[0]): self._is_long = False self._begin_price = self._max_price self._begin_bar = len(self.data) - 1 value = self._begin_price self._max_price = float("-inf") self._min_price = float("inf") elif (not self._is_long) and value < float(self.data.high[0]): self._is_long = True self._begin_price = self._min_price self._begin_bar = len(self.data) - 1 value = self._begin_price self._max_price = float("-inf") self._min_price = float("inf") prev_up = float(self.lines.up_trend[-1]) if len(self) > 0 else 0.0 prev_dn = float(self.lines.down_trend[-1]) if len(self) > 0 else 0.0 if self._is_long: self.lines.up_trend[0] = value self.lines.down_trend[0] = 0.0 else: self.lines.up_trend[0] = 0.0 self.lines.down_trend[0] = value self.lines.buy_signal[0] = ( self.lines.down_trend[0] if prev_up > 0.0 and float(self.lines.down_trend[0]) > 0.0 else 0.0 ) self.lines.sell_signal[0] = ( self.lines.up_trend[0] if prev_dn > 0.0 and float(self.lines.up_trend[0]) > 0.0 else 0.0 )
[docs] class FlatTrendIndicator(Indicator): """Flat-trend regime indicator combining ADX/DI and Parabolic SAR. Emits four binary lines (``buy``, ``sell``, ``end_buy``, ``end_sell``) that classify each bar's trend state from the SAR position relative to price and the dominance of the positive over the negative directional indicator. """ lines = ("sell", "buy", "end_sell", "end_buy") def __init__(self): """Build the ADX, +DI, -DI and Parabolic SAR sub-indicators. Also sets the minimum period to 20 bars so the directional and SAR components have enough history before producing signals. """ self.adx = AverageDirectionalMovementIndex(self.data) self.di_plus = PlusDirectionalIndicator(self.data) self.di_minus = MinusDirectionalIndicator(self.data) self.sar = ParabolicSAR(self.data) self.addminperiod(20)
[docs] def next(self): """Classify the current bar into a buy/sell/end-of-trend state. Sets exactly one of the four output lines to 1.0 based on whether the SAR sits below price (uptrend context) and whether +DI exceeds -DI. """ sell = buy = end_sell = end_buy = 0.0 if self.sar[0] < self.data.close[0]: if self.di_plus[0] > self.di_minus[0]: buy = 1.0 else: end_buy = 1.0 else: if self.di_plus[0] > self.di_minus[0]: end_sell = 1.0 else: sell = 1.0 self.lines.sell[0] = sell self.lines.buy[0] = buy self.lines.end_sell[0] = end_sell self.lines.end_buy[0] = end_buy
[docs] class FlatTrendDistanceIndicator(Indicator): """Volatility-regime classifier from smoothed ATR and standard-deviation slopes. Compares the slopes of smoothed ATR and smoothed standard deviation to emit a ``state`` line flagging rising, falling, or flat volatility. """ lines = ("state",) params = ( ("stdev_period", 20), ("stdev_method", "lwma"), ("stdev_length", 5), ("stdev_phase", 15), ("atr_period", 20), ("atr_method", "lwma"), ("atr_length", 5), ("atr_phase", 15), ) def __init__(self): """Construct smoothed ATR and standard-deviation components, set min period.""" self._atr = AverageTrueRange(self.data, period=max(1, int(self.p.atr_period))) self._std = StandardDeviation(self.data.close, period=max(1, int(self.p.stdev_period))) atr_ma = resolve_ma_class(self.p.atr_method) std_ma = resolve_ma_class(self.p.stdev_method) self._xatr = atr_ma(self._atr, period=max(1, int(self.p.atr_length))) self._xstd = std_ma(self._std, period=max(1, int(self.p.stdev_length))) self.addminperiod( max( int(self.p.atr_period) + int(self.p.atr_length), int(self.p.stdev_period) + int(self.p.stdev_length), ) + 3 )
[docs] def next(self): """Classify the current volatility regime from ATR/stdev slope direction.""" prev_xatr = float(self._xatr[-1]) prev_xstd = float(self._xstd[-1]) xatr = float(self._xatr[0]) xstd = float(self._xstd[0]) res = 0 if prev_xatr > xatr and prev_xstd > xstd: res = 1 if prev_xatr < xatr and prev_xstd < xstd: res = 2 self.lines.state[0] = res + 1
[docs] class IinMASignalIndicator(Indicator): """Cross-period MA signal indicator producing buy/sell trigger levels.""" lines = ("buy_signal", "sell_signal") params = ( ("fast_period", 10), ("fast_ma", "EMA"), ("slow_period", 22), ("slow_ma", "SMA"), ("atr_period", 10), ) def __init__(self): """Initialize fast/slow MAs and internal trend state.""" ma_map = { "SMA": SimpleMovingAverage, "EMA": ExponentialMovingAverage, "SMMA": SmoothedMovingAverage, "WMA": WeightedMovingAverage, } fast_cls = ma_map.get(str(self.p.fast_ma).upper(), ExponentialMovingAverage) slow_cls = ma_map.get(str(self.p.slow_ma).upper(), SimpleMovingAverage) self.fast_ma = fast_cls(self.data.close, period=self.p.fast_period) self.slow_ma = slow_cls(self.data.close, period=self.p.slow_period) self._trend = 0 self.addminperiod(max(self.p.fast_period, self.p.slow_period) + self.p.atr_period + 3)
[docs] def next(self): """Detect MA transitions and write conditional trigger levels.""" buy_signal = 0.0 sell_signal = 0.0 fast_now = float(self.fast_ma[0]) fast_prev = float(self.fast_ma[-1]) slow_now = float(self.slow_ma[0]) slow_prev = float(self.slow_ma[-1]) avg_range = 0.0 for idx in range(self.p.atr_period): avg_range += abs(float(self.data.high[-idx]) - float(self.data.low[-idx])) avg_range /= float(self.p.atr_period) if self._trend <= 0 and fast_now > slow_now and fast_prev < slow_prev: buy_signal = float(self.data.low[0]) - avg_range * 0.5 self._trend = 1 if self._trend >= 0 and fast_now < slow_now and fast_prev > slow_prev: sell_signal = float(self.data.high[0]) + avg_range * 0.5 self._trend = -1 self.lines.buy_signal[0] = buy_signal self.lines.sell_signal[0] = sell_signal
[docs] class KDJ(Indicator): """KDJ (Stochastic) Technical Indicator. The KDJ indicator is a momentum oscillator that compares a specific closing price of a security to a range of its prices over a certain period of time. It consists of three lines: K, D, and J, where K and D are similar to the Stochastic oscillator, and J is a derivative line. The indicator is calculated using the StochasticFull indicator as the base, with J calculated as: J = 3*K - 2*D. Refactoring Note: Uses the next() method instead of line binding (self.l.K = self.kd.percD) because line binding has idx synchronization issues in the current architecture. Attributes: lines: Tuple containing ('K', 'D', 'J') - the three output lines. params: Tuple containing configuration parameters: - period (int): Lookback period for Stochastic calculation (default: 9). - period_dfast (int): Fast %D smoothing period (default: 3). - period_dslow (int): Slow %D smoothing period (default: 3). kd (StochasticFull): Internal StochasticFull indicator instance. """ lines = ("K", "D", "J") params = ( ("period", 9), ("period_dfast", 3), ("period_dslow", 3), ) def __init__(self): """Initialize the KDJ indicator with a StochasticFull base. Creates a StochasticFull indicator with the configured parameters to serve as the foundation for K, D, and J line calculations. """ self.kd = StochasticFull( self.data, period=self.p.period, period_dfast=self.p.period_dfast, period_dslow=self.p.period_dslow, )
[docs] def next(self): """Calculate KDJ values for the current bar. Updates the K, D, and J lines based on the underlying StochasticFull indicator values. The J line is derived from K and D using the formula: J = 3*K - 2*D. """ self.l.K[0] = self.kd.percD[0] self.l.D[0] = self.kd.percDSlow[0] self.l.J[0] = self.l.K[0] * 3 - self.l.D[0] * 2
[docs] class LaguerreIndicator(Indicator): """Laguerre RSI-style oscillator over a four-stage Laguerre filter.""" lines = ("laguerre",) params = (("gamma", 0.7),) def __init__(self): """Set the minimum period and initialize Laguerre filter state.""" self.addminperiod(2) self._l0 = None self._l1 = None self._l2 = None self._l3 = None
[docs] def next(self): """Advance the Laguerre filter and emit the oscillator value.""" price = float(self.data.close[0]) gamma = self.p.gamma if self._l0 is None: self._l0 = price self._l1 = price self._l2 = price self._l3 = price l0_prev = self._l0 l1_prev = self._l1 l2_prev = self._l2 l3_prev = self._l3 l0 = (1.0 - gamma) * price + gamma * l0_prev l1 = -gamma * l0 + l0_prev + gamma * l1_prev l2 = -gamma * l1 + l1_prev + gamma * l2_prev l3 = -gamma * l2 + l2_prev + gamma * l3_prev cu = 0.0 cd = 0.0 if l0 >= l1: cu += l0 - l1 else: cd += l1 - l0 if l1 >= l2: cu += l1 - l2 else: cd += l2 - l1 if l2 >= l3: cu += l2 - l3 else: cd += l3 - l2 self.lines.laguerre[0] = cu / (cu + cd) if (cu + cd) else 0.0 self._l0 = l0 self._l1 = l1 self._l2 = l2 self._l3 = l3
[docs] class LaguerreColorIndicator(Indicator): """Ehlers Laguerre RSI oscillator with high/low colour-state transitions.""" lines = ("value", "color_state") params = ( ("gamma", 0.7), ("high_level", 85), ("middle_level", 50), ("low_level", 15), ) def __init__(self): """Initialize the Laguerre filter stages and minimum period.""" self._l0 = 0.0 self._l1 = 0.0 self._l2 = 0.0 self._l3 = 0.0 self._initialized = False self.addminperiod(3) def _zone(self, value): if value > float(self.p.high_level): return "high" if value > float(self.p.middle_level): return "high_mid" if value < float(self.p.low_level): return "low" return "low_mid" def _color_from_state(self, curr_zone, prev_zone, prev_color): if curr_zone == "high": return 1.0 if curr_zone == "high_mid": if prev_zone == "high": return 2.0 if prev_zone == "high_mid": return prev_color return 1.0 if curr_zone == "low_mid": if prev_zone in ("high", "high_mid"): return 2.0 if prev_zone == "low_mid": return prev_color return 1.0 if curr_zone == "low": return 2.0 return prev_color
[docs] def next(self): """Advance the Laguerre filter and update the value/colour lines.""" price = float(self.data.close[0]) gamma = float(self.p.gamma) prev_l0, prev_l1, prev_l2, prev_l3 = self._l0, self._l1, self._l2, self._l3 if not self._initialized: self._l0 = price self._l1 = price self._l2 = price self._l3 = price self._initialized = True else: self._l0 = (1.0 - gamma) * price + gamma * prev_l0 self._l1 = -gamma * self._l0 + prev_l0 + gamma * prev_l1 self._l2 = -gamma * self._l1 + prev_l1 + gamma * prev_l2 self._l3 = -gamma * self._l2 + prev_l2 + gamma * prev_l3 cu = 0.0 cd = 0.0 pairs = ((self._l0, self._l1), (self._l1, self._l2), (self._l2, self._l3)) for a, b in pairs: if a >= b: cu += a - b else: cd += b - a value = 0.0 if (cu + cd) > 1e-12: value = 100.0 * cu / (cu + cd) prev_value = float(self.lines.value[-1]) if len(self) > 1 else value prev_color = float(self.lines.color_state[-1]) if len(self) > 1 else 1.0 curr_zone = self._zone(value) prev_zone = self._zone(prev_value) color = self._color_from_state(curr_zone, prev_zone, prev_color) self.lines.value[0] = value self.lines.color_state[0] = color
[docs] class RelativeVigorIndex(Indicator): """Relative Vigor Index (RVI) with its 4-point symmetric signal line.""" lines = ("rvi", "signal") params = (("period", 44),) def __init__(self): """Reserve enough warm-up bars for the period plus the 4-bar weighting.""" self.addminperiod(self.p.period + 6)
[docs] def next(self): """Compute the RVI ratio and its weighted signal value for the current bar.""" numerator_sum = 0.0 denominator_sum = 0.0 for shift in range(self.p.period): close0 = float(self.data.close[-shift]) open0 = float(self.data.open[-shift]) close1 = float(self.data.close[-shift - 1]) open1 = float(self.data.open[-shift - 1]) close2 = float(self.data.close[-shift - 2]) open2 = float(self.data.open[-shift - 2]) close3 = float(self.data.close[-shift - 3]) open3 = float(self.data.open[-shift - 3]) high0 = float(self.data.high[-shift]) low0 = float(self.data.low[-shift]) high1 = float(self.data.high[-shift - 1]) low1 = float(self.data.low[-shift - 1]) high2 = float(self.data.high[-shift - 2]) low2 = float(self.data.low[-shift - 2]) high3 = float(self.data.high[-shift - 3]) low3 = float(self.data.low[-shift - 3]) numerator_sum += ( (close0 - open0) + 2.0 * (close1 - open1) + 2.0 * (close2 - open2) + (close3 - open3) ) / 6.0 denominator_sum += ( (high0 - low0) + 2.0 * (high1 - low1) + 2.0 * (high2 - low2) + (high3 - low3) ) / 6.0 rvi_value = numerator_sum / denominator_sum if denominator_sum else 0.0 self.lines.rvi[0] = rvi_value if len(self) >= 4: values = [ float(self.lines.rvi[0]), float(self.lines.rvi[-1]), float(self.lines.rvi[-2]), float(self.lines.rvi[-3]), ] if all(math.isfinite(value) for value in values): self.lines.signal[0] = ( values[0] + 2.0 * values[1] + 2.0 * values[2] + values[3] ) / 6.0 else: self.lines.signal[0] = rvi_value else: self.lines.signal[0] = rvi_value
[docs] class SmoothedRelativeVigorIndex(Indicator): """Relative Vigor Index indicator with smoothed signal line.""" lines = ("rvi", "signal") params = (("period", 13),) def __init__(self): """Compute weighted numerator/denominator and moving-average filtered lines.""" weighted_num = ( (self.data.close - self.data.open) + 2.0 * (self.data.close(-1) - self.data.open(-1)) + 2.0 * (self.data.close(-2) - self.data.open(-2)) + (self.data.close(-3) - self.data.open(-3)) ) / 6.0 weighted_den = ( (self.data.high - self.data.low) + 2.0 * (self.data.high(-1) - self.data.low(-1)) + 2.0 * (self.data.high(-2) - self.data.low(-2)) + (self.data.high(-3) - self.data.low(-3)) ) / 6.0 num_ma = SimpleMovingAverage(weighted_num, period=self.p.period) den_ma = SimpleMovingAverage(weighted_den, period=self.p.period) self.lines.rvi = If(den_ma != 0, num_ma / den_ma, 0.0) self.lines.signal = ( self.lines.rvi + 2.0 * self.lines.rvi(-1) + 2.0 * self.lines.rvi(-2) + self.lines.rvi(-3) ) / 6.0
[docs] class SafeCCI(Indicator): """Safe Commodity Channel Index indicator with guarded zero-variance handling.""" lines = ("cci",) params = (("period", 14),) def __init__(self): """Initialize CCI period warm-up requirement.""" self.addminperiod(self.p.period + 3)
[docs] def next(self): """Compute CCI value for the current bar with mean deviation protection.""" typical_prices = [] for idx in range(self.p.period): typical_prices.append( ( float(self.data.high[-idx]) + float(self.data.low[-idx]) + float(self.data.close[-idx]) ) / 3.0 ) tp_now = typical_prices[0] tp_sma = sum(typical_prices) / float(len(typical_prices)) mean_dev = sum(abs(tp - tp_sma) for tp in typical_prices) / float(len(typical_prices)) if mean_dev <= 1e-12: self.lines.cci[0] = 0.0 return self.lines.cci[0] = (tp_now - tp_sma) / (0.015 * mean_dev)
[docs] class SafeCCIWithFactor(Indicator): """CCI indicator with mean deviation, returning 0.0 when denominator is zero.""" lines = ("cci",) params = ( ("period", 27), ("factor", 0.015), ) def __init__(self): """Initialise SafeCCI and set minimum period to `period`.""" self.addminperiod(self.p.period)
[docs] def next(self): """Compute CCI from rolling typical-price SMA and mean deviation.""" period = self.p.period typical_prices = [ (float(self.data.high[-i]) + float(self.data.low[-i]) + float(self.data.close[-i])) / 3.0 for i in range(period) ] sma = sum(typical_prices) / period mean_dev = sum(abs(tp - sma) for tp in typical_prices) / period current_tp = typical_prices[0] denominator = self.p.factor * mean_dev self.lines.cci[0] = 0.0 if denominator == 0.0 else (current_tp - sma) / denominator
[docs] class SilverTrendSignalProxy(Indicator): """SilverTrend buy/sell signal proxy based on a moving-average crossover. Emits a non-zero ``buy`` (or ``sell``) value when price crosses above (or below) a risk-scaled simple moving average, mirroring the EA's signal lines. """ lines = ("buy", "sell") params = (("risk", 3),) def __init__(self): """Build the risk-scaled moving average and set the minimum period.""" self.period = max(3, int(self.p.risk) * 2 + 1) self.ma = SimpleMovingAverage(self.data.close, period=self.period) self.addminperiod(self.period + 3)
[docs] def next(self): """Set buy/sell signal lines from the price/MA crossover this bar.""" buy = 0.0 sell = 0.0 close0 = float(self.data.close[0]) close1 = float(self.data.close[-1]) ma0 = float(self.ma[0]) ma1 = float(self.ma[-1]) if close1 <= ma1 and close0 > ma0: buy = close0 elif close1 >= ma1 and close0 < ma0: sell = close0 self.lines.buy[0] = buy self.lines.sell[0] = sell
[docs] class SilverTrendDirectionSignalProxy(Indicator): """Proxy indicator emitting +1/-1 on SMA crossover direction flips.""" lines = ("signal",) params = (("risk", 3),) def __init__(self): """Set up the SMA and minimum period from the risk parameter.""" self.period = max(3, int(self.p.risk) * 2 + 1) self.ma = SimpleMovingAverage(self.data.close, period=self.period) self.addminperiod(self.period + 2)
[docs] def next(self): """Carry the prior signal forward, flipping it on a fresh MA crossover.""" signal = float(self.lines.signal[-1]) if len(self) > 0 else 0.0 if not math.isfinite(signal): signal = 0.0 close_prev = float(self.data.close[-1]) close_now = float(self.data.close[0]) ma_prev = float(self.ma[-1]) ma_now = float(self.ma[0]) if close_prev <= ma_prev and close_now > ma_now: signal = 1.0 elif close_prev >= ma_prev and close_now < ma_now: signal = -1.0 self.lines.signal[0] = signal