Source code for backtrader.indicators.contrib.non_lag_dot_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
import math
from .. import Indicator
__all__ = [
"NonLagDotIndicator",
]
PI = math.pi
[docs]
class NonLagDotIndicator(Indicator):
"""Reconstructs NonLagDot from its MQ5 source.
Applies a weighted cosine kernel over SMA values to produce a non-lag MA,
then assigns color: 0=gray, 1=magenta(down), 2=green(up).
"""
lines = ("nlm", "color")
params = (
("length", 10),
("filter_pts", 0),
("deviation", 0.0),
("point", 0.0001),
)
def __init__(self):
"""Pre-compute kernel parameters and allocate indicator state."""
self._length = int(self.p.length)
coeff = 3 * PI
phase = self._length - 1
cycle = 4
self._len_total = int(self._length * cycle + phase)
self._dT1 = (2 * cycle - 1) / (cycle * self._length - 1)
self._dT2 = 1.0 / (phase - 1) if phase > 1 else 1.0
self._kd = 1.0 + self.p.deviation / 100.0
self._fi = int(self.p.filter_pts) * float(self.p.point)
self._coeff = coeff
self._phase = phase
self._cycle = cycle
self._trend = 0
self.addminperiod(self._length + self._len_total + 2)
def _calc_sma(self, ago):
length = self._length
total = 0.0
for i in range(length):
total += float(self.data.close[-(ago + i)])
return total / length
[docs]
def next(self):
"""Compute smoothed value and trend color for current bar.
The method calculates weighted SMA contributions, applies optional filtering,
and updates `nlm` and `color` lines.
"""
len_total = self._len_total
coeff = self._coeff
phase = self._phase
fi = self._fi
# Build weighted sum using cosine kernel over SMA values
total_sum = 0.0
total_weight = 0.0
t = 0.0
for i in range(int(len_total)):
# SMA at offset i (0 = current bar)
sma_val = self._calc_sma(i)
if i <= phase - 1:
alfa = 1.0
else:
alfa = 1.0 / (1.0 + math.exp((i - phase + 0.5) * coeff / len_total))
beta = math.cos(PI * t)
g = 1.0 / (coeff * t + 1.0)
if t <= 0.5:
g = 1.0
total_sum += sma_val * beta * g * alfa
total_weight += beta * g * alfa
if t < 0.5:
t += self._dT2
elif t < len_total - 1:
t += self._dT1
nlm_val = self._kd * total_sum / total_weight if total_weight > 0 else 0.0
# Filter: if change < fi, hold previous value
prev_nlm = (
float(self.lines.nlm[-1])
if len(self.lines.nlm) > 1 and not math.isnan(float(self.lines.nlm[-1]))
else nlm_val
)
if fi > 0 and abs(nlm_val - prev_nlm) < fi:
nlm_val = prev_nlm
self.lines.nlm[0] = nlm_val
# Trend detection
trend = self._trend
if nlm_val - prev_nlm > fi:
trend = 1 # up
if prev_nlm - nlm_val > fi:
trend = -1 # down
# Color: 0=gray, 1=magenta(down), 2=green(up)
color = 0.0
if trend > 0:
color = 2.0
if trend < 0:
color = 1.0
self.lines.color[0] = color
self._trend = trend