Source code for backtrader.indicators.contrib.rkd_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
from .. import Indicator
__all__ = [
"RKDIndicator",
]
[docs]
class RKDIndicator(Indicator):
"""Compute a custom RKD line set from RSV/K/D values."""
lines = ("rsv", "k", "d")
params = (
("kd_period", 30),
("m1", 3),
("m2", 6),
)
def __init__(self):
"""Define indicator warm-up length before valid line updates."""
self.addminperiod(max(int(self.p.kd_period), int(self.p.m1), int(self.p.m2)) + 2)
[docs]
def next(self):
"""Update RSV, K, and D for the latest bar."""
kd_period = int(self.p.kd_period)
m1 = int(self.p.m1)
m2 = int(self.p.m2)
highs = [float(self.data.high[-i]) for i in range(kd_period)]
lows = [float(self.data.low[-i]) for i in range(kd_period)]
max_high = max(highs)
min_low = min(lows)
denom = max_high - min_low
if denom == 0:
rsv = 0.0
else:
rsv = (float(self.data.close[0]) - min_low) / denom * 100.0
self.lines.rsv[0] = rsv
if len(self) < m1:
self.lines.k[0] = 0.0
else:
self.lines.k[0] = sum(float(self.lines.rsv[-i]) for i in range(m1)) / m1
if len(self) < m2:
self.lines.d[0] = 0.0
else:
self.lines.d[0] = sum(float(self.lines.k[-i]) for i in range(m2)) / m2