Source code for backtrader.indicators.contrib.kdj_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import Indicator

__all__ = [
    "KDJIndicator",
]


[docs] class KDJIndicator(Indicator): """Indicator class implementing the custom KDJ oscillator. Lines: kdc (Line): Output difference line (%K - %D). rsv (Line): Raw Stochastic Value line. k (Line): Smoothed %K line. d (Line): Smoothed %D line. """ lines = ("kdc", "rsv", "k", "d") params = ( ("m1", 3), ("m2", 6), ("kdj_period", 30), ) def __init__(self): """Initialize the custom KDJ indicator and establish minimum warmup period.""" self.addminperiod(int(self.p.kdj_period) + int(self.p.m2) + 2)
[docs] def next(self): """Calculate RSV, %K, %D, and %K-%D values on each new bar.""" kdj_period = int(self.p.kdj_period) m1 = int(self.p.m1) m2 = int(self.p.m2) highs = [float(self.data.high[-i]) for i in range(kdj_period)] lows = [float(self.data.low[-i]) for i in range(kdj_period)] max_high = max(highs) min_low = min(lows) if max_high - min_low != 0.0: self.lines.rsv[0] = (float(self.data.close[0]) - min_low) / (max_high - min_low) * 100.0 else: self.lines.rsv[0] = 1.0 rsv_values = [] for i in range(m1): value = float(self.lines.rsv[-i]) if len(self) > i else 50.0 rsv_values.append(value) self.lines.k[0] = sum(rsv_values) / float(m1) k_values = [] for i in range(m2): value = float(self.lines.k[-i]) if len(self) > i else 50.0 k_values.append(value) self.lines.d[0] = sum(k_values) / float(m2) self.lines.kdc[0] = float(self.lines.k[0]) - float(self.lines.d[0])