Source code for backtrader.indicators.contrib.kwan_nrp_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
import math
from .. import (
RSI,
Indicator,
MomentumOscillator,
Stochastic,
)
__all__ = [
"KwanNrpIndicator",
]
[docs]
class KwanNrpIndicator(Indicator):
"""Compute smoothed KWAN_NRP value and direction direction states."""
lines = (
"kwan",
"direction",
)
params = (
("k_period", 5),
("d_period", 3),
("slowing", 3),
("rsi_period", 14),
("momentum_period", 14),
("x_length", 3),
)
def __init__(self):
"""Prepare stochastic, RSI, and momentum components for indicator output."""
# --- Stochastic %D (signal line) ---
stoch = Stochastic(
self.data,
period=self.p.k_period,
period_dfast=self.p.slowing,
period_dslow=self.p.d_period,
)
self.stoch_d = stoch.percD
# --- RSI ---
self.rsi = RSI(
self.data.close,
period=self.p.rsi_period,
)
# --- Momentum Oscillator = 100 * close / close[-period] ---
self.mom_osc = MomentumOscillator(
self.data.close,
period=self.p.momentum_period,
)
# --- Raw KWAN oscillator ---
# kwan_raw = stoch_d * rsi / mom_osc
# Guard against mom_osc == 0 in next() and protect period alignment.
self.addminperiod(
max(
self.p.k_period + self.p.slowing + self.p.d_period,
self.p.rsi_period,
self.p.momentum_period,
)
+ self.p.x_length
+ 2
)
# Internal raw value buffer.
self._raw_buf = []
[docs]
def next(self):
"""Update KWAN value and directional signal line."""
mom = self.mom_osc[0]
if mom == 0 or not math.isfinite(mom):
kwan_raw = 100.0
else:
kwan_raw = self.stoch_d[0] * self.rsi[0] / mom
self._raw_buf.append(kwan_raw)
# Smooth raw values with a simple moving average of the last XLength bars.
xl = self.p.x_length
if len(self._raw_buf) >= xl:
smoothed = sum(self._raw_buf[-xl:]) / xl
else:
smoothed = kwan_raw
self.lines.kwan[0] = smoothed
# Determine direction: rise / fall / flat versus previous smoothed value.
if len(self._raw_buf) < xl + 1:
self.lines.direction[0] = 1.0
return
prev_smoothed_vals = self._raw_buf[-(xl + 1) : -1]
if len(prev_smoothed_vals) >= xl:
prev_smoothed = sum(prev_smoothed_vals[-xl:]) / xl
else:
prev_smoothed = smoothed
if smoothed > prev_smoothed:
self.lines.direction[0] = 0.0 # rising
elif smoothed < prev_smoothed:
self.lines.direction[0] = 2.0 # falling
else:
self.lines.direction[0] = 1.0