Source code for backtrader.indicators.contrib.stochastic_histogram_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import Indicator

__all__ = [
    "StochasticHistogramIndicator",
]


[docs] class StochasticHistogramIndicator(Indicator): """Stochastic-based indicator with a smoothed main line, signal line, and color state. The indicator outputs: - ``main``: smoothed %K values. - ``signal``: smoothed trigger line derived from main. - ``hist_base``: fixed base reference line (50.0). - ``color_state``: 0 for overbought, 1 for neutral, 2 for oversold. """ lines = ("main", "signal", "hist_base", "color_state") params = ( ("k_period", 5), ("d_period", 3), ("slowing", 3), ("ma_method", "sma"), ("high_level", 60), ("low_level", 40), ) def __init__(self): """Set the minimum required bars from K, slowing, and D periods.""" self.addminperiod(int(self.p.k_period) + int(self.p.slowing) + int(self.p.d_period) + 2) def _fast_k_at(self, i, high_array, low_array, close_array): period = max(1, int(self.p.k_period)) if i - period + 1 < 0: return float("nan") lowest = min(float(low_array[idx]) for idx in range(i - period + 1, i + 1)) highest = max(float(high_array[idx]) for idx in range(i - period + 1, i + 1)) denom = highest - lowest return 100.0 * (float(close_array[i]) - lowest) / denom if denom else 50.0 def _ma_value(self, values, period, previous=None): if not values: return float("nan") mode = str(self.p.ma_method).strip().lower() value = float(values[-1]) if mode in {"ema", "mode_ema"}: if previous is None or previous != previous: return value alpha = 2.0 / (period + 1.0) return previous + alpha * (value - previous) if mode in {"smma", "mode_smma"}: if previous is None or previous != previous: return value return ((period - 1.0) * previous + value) / period if mode in {"lwma", "wma", "mode_lwma"}: weights = list(range(1, len(values) + 1)) return sum(v * w for v, w in zip(values, weights)) / sum(weights) return sum(values) / len(values)
[docs] def next(self): """Compute indicator values for one new bar in Backtrader streaming mode.""" k_period = max(1, int(self.p.k_period)) slowing = max(1, int(self.p.slowing)) d_period = max(1, int(self.p.d_period)) fast_values = [] for ago in range(slowing - 1, -1, -1): if len(self.data) <= ago + k_period - 1: return low_values = [float(self.data.low[-ago - shift]) for shift in range(k_period)] high_values = [float(self.data.high[-ago - shift]) for shift in range(k_period)] lowest = min(low_values) highest = max(high_values) denom = highest - lowest fast_values.append( 100.0 * (float(self.data.close[-ago]) - lowest) / denom if denom else 50.0 ) prev_main = float(self.lines.main[-1]) if len(self) > 1 else None prev_signal = float(self.lines.signal[-1]) if len(self) > 1 else None main = self._ma_value(fast_values, slowing, prev_main) self.lines.main[0] = main signal_values = [ float(self.lines.main[-ago]) for ago in range(min(len(self), d_period) - 1, 0, -1) ] signal_values.append(main) self.lines.signal[0] = self._ma_value(signal_values, d_period, prev_signal) color = 1.0 if main > float(self.p.high_level): color = 0.0 elif main < float(self.p.low_level): color = 2.0 self.lines.color_state[0] = color self.lines.hist_base[0] = 50.0
[docs] def once(self, start, end): """Compute indicator arrays for run-once mode from ``start`` to ``end`` indices.""" high_array = self.data.high.array low_array = self.data.low.array close_array = self.data.close.array main_line = self.lines.main.array signal_line = self.lines.signal.array hist_base_line = self.lines.hist_base.array color_state_line = self.lines.color_state.array for line in (main_line, signal_line, hist_base_line, color_state_line): while len(line) < end: line.append(float("nan")) slowing = max(1, int(self.p.slowing)) d_period = max(1, int(self.p.d_period)) actual_end = min(end, len(high_array), len(low_array), len(close_array)) fast_values = [ self._fast_k_at(i, high_array, low_array, close_array) for i in range(actual_end) ] main_values = [] signal_values_all = [] prev_main = None prev_signal = None for i in range(actual_end): main_start = max(0, i - slowing + 1) main = self._ma_value(fast_values[main_start : i + 1], slowing, prev_main) main_values.append(main) prev_main = main signal_start = max(0, i - d_period + 1) signal = self._ma_value(main_values[signal_start : i + 1], d_period, prev_signal) signal_values_all.append(signal) prev_signal = signal if i < start: continue main_line[i] = main signal_line[i] = signal_values_all[i] color = 1.0 if main > float(self.p.high_level): color = 0.0 elif main < float(self.p.low_level): color = 2.0 color_state_line[i] = color hist_base_line[i] = 50.0