Source code for backtrader.indicators.contrib.kwan_rdp_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

import math

from .. import Indicator

__all__ = [
    "KwanRdpIndicator",
]


[docs] class KwanRdpIndicator(Indicator): """Custom Kwan RDP technical indicator. Lines: kwan (LineSeries): Smoothed combination of DeMarker, MFI, and Momentum. direction (LineSeries): Directional momentum flag (0 = bullish, 1 = flat, 2 = bearish). """ lines = ( "kwan", "direction", ) params = ( ("demarker_period", 14), ("mfi_period", 14), ("volume_type", "TICK"), ("momentum_period", 14), ("momentum_price", "CLOSE"), ("xma_method", "JJMA"), ("x_length", 7), ("x_phase", 100), ) def __init__(self): """Initialize indicator state: rolling buffers and minperiod.""" self.addminperiod( max(self.p.demarker_period, self.p.mfi_period, self.p.momentum_period) + self.p.x_length + 5 ) self._high_buf = [] self._low_buf = [] self._close_buf = [] self._typical_buf = [] self._money_flow_buf = [] self._raw_buf = [] self._smooth_prev = None self._smooth_buf = [] def _select_price(self, mode): """Return the selected price value (open/high/low/median/typical/weighted/close).""" mode = str(mode).upper() if mode == "OPEN": return float(self.data.open[0]) if mode == "HIGH": return float(self.data.high[0]) if mode == "LOW": return float(self.data.low[0]) if mode == "MEDIAN": return (float(self.data.high[0]) + float(self.data.low[0])) / 2.0 if mode == "TYPICAL": return ( float(self.data.high[0]) + float(self.data.low[0]) + float(self.data.close[0]) ) / 3.0 if mode == "WEIGHTED": return ( float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.close[0]) ) / 4.0 return float(self.data.close[0]) def _calc_demarker(self): """Compute DeMarker oscillator from rolling high/low buffers.""" p = int(self.p.demarker_period) if len(self._high_buf) <= p or len(self._low_buf) <= p: return None demax = [] demin = [] for i in range(len(self._high_buf) - p, len(self._high_buf)): high_diff = self._high_buf[i] - self._high_buf[i - 1] low_diff = self._low_buf[i - 1] - self._low_buf[i] demax.append(max(high_diff, 0.0)) demin.append(max(low_diff, 0.0)) smax = sum(demax) smin = sum(demin) denom = smax + smin if denom == 0: return 0.5 return smax / denom def _calc_mfi(self): """Compute Money Flow Index from typical price and volume buffers.""" p = int(self.p.mfi_period) if len(self._typical_buf) <= p or len(self._money_flow_buf) <= p: return None pos_flow = 0.0 neg_flow = 0.0 start = len(self._typical_buf) - p for i in range(start, len(self._typical_buf)): prev_tp = self._typical_buf[i - 1] curr_tp = self._typical_buf[i] curr_flow = self._money_flow_buf[i] if curr_tp > prev_tp: pos_flow += curr_flow elif curr_tp < prev_tp: neg_flow += curr_flow if neg_flow == 0: return 100.0 money_ratio = pos_flow / neg_flow return 100.0 - (100.0 / (1.0 + money_ratio)) def _calc_momentum(self): """Compute momentum as percentage change of selected price over period.""" p = int(self.p.momentum_period) if len(self._close_buf) <= p: return None prev_price = self._close_buf[-(p + 1)] curr_price = self._select_price(self.p.momentum_price) if prev_price == 0: return None return 100.0 * curr_price / prev_price def _smooth_value(self, raw_value): """Smooth raw_value using SMA or phase-adjusted exponential (JJMA-like).""" method = str(self.p.xma_method).upper() if method in ("MODE_SMA_", "SMA"): period = max(1, int(self.p.x_length)) if len(self._raw_buf) < period: return raw_value return sum(self._raw_buf[-period:]) / float(period) length = max(1, int(self.p.x_length)) phase = max(-100, min(100, int(self.p.x_phase))) alpha = 2.0 / (length + 1.0) alpha *= 1.0 + 0.35 * (phase / 100.0) alpha = max(0.01, min(0.99, alpha)) if self._smooth_prev is None or not math.isfinite(self._smooth_prev): smooth = raw_value else: smooth = self._smooth_prev + alpha * (raw_value - self._smooth_prev) self._smooth_prev = smooth return smooth
[docs] def next(self): """Compute Kwan RDP indicator: composite of DeMarker * MFI / momentum, smoothed.""" high = float(self.data.high[0]) low = float(self.data.low[0]) close = float(self.data.close[0]) volume = float(self.data.volume[0]) if math.isfinite(float(self.data.volume[0])) else 0.0 self._high_buf.append(high) self._low_buf.append(low) self._close_buf.append(close) typical = (high + low + close) / 3.0 self._typical_buf.append(typical) self._money_flow_buf.append(typical * volume) demarker = self._calc_demarker() mfi = self._calc_mfi() momentum = self._calc_momentum() if demarker is None or mfi is None or momentum is None: self.lines.kwan[0] = 0.0 self.lines.direction[0] = 1.0 return if momentum == 0 or not math.isfinite(momentum): raw_value = 100.0 else: raw_value = 100.0 * demarker * mfi / momentum self._raw_buf.append(raw_value) smooth = self._smooth_value(raw_value) self._smooth_buf.append(smooth) self.lines.kwan[0] = smooth if len(self._smooth_buf) < 2: self.lines.direction[0] = 1.0 return prev_smooth = self._smooth_buf[-2] if smooth > prev_smooth: self.lines.direction[0] = 0.0 elif smooth < prev_smooth: self.lines.direction[0] = 2.0 else: self.lines.direction[0] = 1.0