Source code for backtrader.indicators.contrib.ultra_wpr_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

import math
from collections import deque

from .. import (
    Indicator,
    WilliamsR,
)

__all__ = [
    "UltraWPRIndicator",
]


class CountSmoother:
    """Incremental moving-average smoother supporting several MA methods.

    Maintains rolling state so each ``update`` call returns the smoothed value
    for one of the SMA, LWMA/JJMA, SMMA, or EMA methods.
    """

    def __init__(self, method, period):
        """Initialize the smoother.

        Args:
            method: Smoothing method name (e.g. ``sma``, ``lwma``, ``smma``).
            period: Smoothing window length (clamped to at least 1).
        """
        self.method = str(method).lower()
        self.period = max(1, int(period))
        self.values = deque(maxlen=self.period)
        self.state = None

    def update(self, value):
        """Add a new sample and return the updated smoothed value.

        Args:
            value: The new raw value to incorporate.

        Returns:
            The smoothed value after including ``value``.
        """
        value = float(value)
        if self.method in {"sma", "mode_sma"}:
            self.values.append(value)
            return sum(self.values) / len(self.values)
        if self.method in {"lwma", "mode_lwma", "jjma", "mode_jjma"}:
            self.values.append(value)
            weights = list(range(1, len(self.values) + 1))
            return sum(v * w for v, w in zip(self.values, weights)) / sum(weights)
        if self.method in {"smma", "mode_smma"}:
            if self.state is None:
                self.state = value
            else:
                self.state = ((self.period - 1) * self.state + value) / self.period
            return self.state
        alpha = 2.0 / (self.period + 1.0)
        if self.state is None:
            self.state = value
        else:
            self.state = self.state + alpha * (value - self.state)
        return self.state


[docs] class UltraWPRIndicator(Indicator): """Ultra Williams %R oscillator emitting bulls and bears strength lines.""" lines = ( "bulls", "bears", ) params = ( ("wpr_period", 13), ("w_method", "jjma"), ("start_length", 3), ("w_phase", 100), ("xstep", 5), ("xsteps_total", 10), ("smooth_method", "jjma"), ("smooth_length", 3), ("smooth_phase", 100), ) def __init__(self): """Build the WPR fan, per-length smoothers, and output smoothers.""" self._periods = [ int(self.p.start_length + self.p.xstep * i) for i in range(int(self.p.xsteps_total) + 1) ] self._wpr_indicators = [ WilliamsR(self.data, period=self.p.wpr_period) for _ in self._periods ] self._smoothers = [CountSmoother(self.p.w_method, period) for period in self._periods] self._prev_values = [None for _ in self._periods] self._bull_smoother = CountSmoother(self.p.smooth_method, self.p.smooth_length) self._bear_smoother = CountSmoother(self.p.smooth_method, self.p.smooth_length) self.addminperiod(self.p.wpr_period + max(self._periods) + self.p.smooth_length + 5)
[docs] def next(self): """Count rising vs falling smoothed WPR lines and emit bulls/bears.""" upsch = 0.0 dnsch = 0.0 current_values = [] base_wpr = float(self._wpr_indicators[0][0]) if math.isnan(base_wpr): self.lines.bulls[0] = float("nan") self.lines.bears[0] = float("nan") return for index, smoother in enumerate(self._smoothers): value = smoother.update(base_wpr) current_values.append(value) prev = self._prev_values[index] if prev is None: continue if value > prev: upsch += 1.0 else: dnsch += 1.0 self.lines.bulls[0] = self._bull_smoother.update(upsch) self.lines.bears[0] = self._bear_smoother.update(dnsch) self._prev_values = current_values
[docs] def once(self, start, end): """Vectorized batch computation of bulls/bears over a bar range. Args: start: Index of the first bar to compute (inclusive). end: Index just past the last bar to compute (exclusive). """ base_wpr_array = self._wpr_indicators[0].lines[0].array bulls_line = self.lines.bulls.array bears_line = self.lines.bears.array for line in (bulls_line, bears_line): while len(line) < end: line.append(float("nan")) smoothers = [CountSmoother(self.p.w_method, period) for period in self._periods] prev_values = [None for _ in self._periods] bull_smoother = CountSmoother(self.p.smooth_method, self.p.smooth_length) bear_smoother = CountSmoother(self.p.smooth_method, self.p.smooth_length) actual_end = min(end, len(base_wpr_array)) for i in range(start, actual_end): upsch = 0.0 dnsch = 0.0 current_values = [] base_wpr = float(base_wpr_array[i]) if math.isnan(base_wpr): bulls_line[i] = float("nan") bears_line[i] = float("nan") continue for index, smoother in enumerate(smoothers): value = smoother.update(base_wpr) current_values.append(value) prev = prev_values[index] if prev is None: continue if value > prev: upsch += 1.0 else: dnsch += 1.0 bulls_line[i] = bull_smoother.update(upsch) bears_line[i] = bear_smoother.update(dnsch) prev_values = current_values self._smoothers = smoothers self._prev_values = prev_values self._bull_smoother = bull_smoother self._bear_smoother = bear_smoother