#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
import math
from .. import Indicator
__all__ = [
"KwanCccIndicator",
]
[docs]
class KwanCccIndicator(Indicator):
"""Custom Kwan CCC technical indicator.
Lines:
kwan (LineSeries): Smoothed combination of Chaikin, CCI, and Momentum.
direction (LineSeries): Directional momentum flag (0 = bullish, 1 = flat, 2 = bearish).
"""
lines = (
"kwan",
"direction",
)
params = (
("fast_ma_period", 3),
("slow_ma_period", 10),
("ma_method", "LWMA"),
("cci_period", 14),
("cci_price", "MEDIAN"),
("momentum_period", 7),
("momentum_price", "CLOSE"),
("xma_method", "JJMA"),
("x_length", 7),
("x_phase", 100),
)
def __init__(self):
"""Initialize indicator variables, buffer lists, and min periods."""
self.addminperiod(
max(self.p.slow_ma_period, self.p.cci_period, self.p.momentum_period)
+ self.p.x_length
+ 5
)
self._adl_buf = []
self._chaikin_buf = []
self._cci_price_buf = []
self._momentum_price_buf = []
self._raw_buf = []
self._smooth_prev = None
self._smooth_buf = []
def _select_price(self, mode):
mode = str(mode).upper()
if mode == "OPEN":
return float(self.data.open[0])
if mode == "HIGH":
return float(self.data.high[0])
if mode == "LOW":
return float(self.data.low[0])
if mode == "MEDIAN":
return (float(self.data.high[0]) + float(self.data.low[0])) / 2.0
if mode == "TYPICAL":
return (
float(self.data.high[0]) + float(self.data.low[0]) + float(self.data.close[0])
) / 3.0
if mode == "WEIGHTED":
return (
float(self.data.high[0]) + float(self.data.low[0]) + 2.0 * float(self.data.close[0])
) / 4.0
return float(self.data.close[0])
@staticmethod
def _sma(values, period):
if len(values) < period or period <= 0:
return None
window = values[-period:]
return sum(window) / float(period)
@staticmethod
def _lwma(values, period):
if len(values) < period or period <= 0:
return None
window = values[-period:]
weights = list(range(1, period + 1))
denom = sum(weights)
return sum(v * w for v, w in zip(window, weights)) / float(denom)
def _ma(self, values, period, method):
method = str(method).upper()
if method in ("MODE_LWMA", "LWMA"):
return self._lwma(values, period)
return self._sma(values, period)
def _calc_cci(self):
period = int(self.p.cci_period)
if len(self._cci_price_buf) < period or period <= 0:
return None
window = self._cci_price_buf[-period:]
sma = sum(window) / float(period)
mean_dev = sum(abs(v - sma) for v in window) / float(period)
if mean_dev == 0:
return 0.0
return (window[-1] - sma) / (0.015 * mean_dev)
def _calc_momentum(self):
period = int(self.p.momentum_period)
if len(self._momentum_price_buf) <= period or period <= 0:
return None
prev_price = self._momentum_price_buf[-(period + 1)]
curr_price = self._momentum_price_buf[-1]
if prev_price == 0:
return None
return 100.0 * curr_price / prev_price
def _smooth_value(self, raw_value):
method = str(self.p.xma_method).upper()
if method in ("MODE_SMA_", "SMA"):
period = max(1, int(self.p.x_length))
if len(self._raw_buf) < period:
return raw_value
return sum(self._raw_buf[-period:]) / float(period)
length = max(1, int(self.p.x_length))
phase = max(-100, min(100, int(self.p.x_phase)))
alpha = 2.0 / (length + 1.0)
alpha *= 1.0 + 0.35 * (phase / 100.0)
alpha = max(0.01, min(0.99, alpha))
if self._smooth_prev is None or not math.isfinite(self._smooth_prev):
smooth = raw_value
else:
smooth = self._smooth_prev + alpha * (raw_value - self._smooth_prev)
self._smooth_prev = smooth
return smooth
[docs]
def next(self):
"""Compute the Kwan CCC metric and directional momentum flags on each bar."""
high = float(self.data.high[0])
low = float(self.data.low[0])
close = float(self.data.close[0])
volume = float(self.data.volume[0]) if math.isfinite(float(self.data.volume[0])) else 0.0
if high != low:
mf_mult = ((close - low) - (high - close)) / (high - low)
else:
mf_mult = 0.0
adl_prev = self._adl_buf[-1] if self._adl_buf else 0.0
adl = adl_prev + mf_mult * volume
self._adl_buf.append(adl)
chaikin_fast = self._ma(self._adl_buf, int(self.p.fast_ma_period), self.p.ma_method)
chaikin_slow = self._ma(self._adl_buf, int(self.p.slow_ma_period), self.p.ma_method)
if chaikin_fast is None or chaikin_slow is None:
self.lines.kwan[0] = 0.0
self.lines.direction[0] = 1.0
return
chaikin = chaikin_fast - chaikin_slow
self._chaikin_buf.append(chaikin)
self._cci_price_buf.append(self._select_price(self.p.cci_price))
self._momentum_price_buf.append(self._select_price(self.p.momentum_price))
cci = self._calc_cci()
momentum = self._calc_momentum()
if cci is None or momentum is None:
self.lines.kwan[0] = 0.0
self.lines.direction[0] = 1.0
return
if momentum == 0 or not math.isfinite(momentum):
raw_value = 100.0
else:
raw_value = chaikin * cci / momentum
self._raw_buf.append(raw_value)
smooth = self._smooth_value(raw_value)
self._smooth_buf.append(smooth)
self.lines.kwan[0] = smooth
if len(self._smooth_buf) < 2:
self.lines.direction[0] = 1.0
return
prev_smooth = self._smooth_buf[-2]
if smooth > prev_smooth:
self.lines.direction[0] = 0.0
elif smooth < prev_smooth:
self.lines.direction[0] = 2.0
else:
self.lines.direction[0] = 1.0