Source code for backtrader.indicators.contrib.lsma_angle_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
import math
from .. import Indicator
__all__ = [
"LsmaAngleIndicator",
]
[docs]
class LsmaAngleIndicator(Indicator):
"""Compute LSMA slope angle and map it to a color-state indicator.
The indicator evaluates two least-squares MAs at different shifts and uses
the angle and trend persistence to classify bullish/bearish momentum states.
Args:
lsma_period: Look-back window used for each LSMA estimate.
angle_threshold: Threshold used to classify the angle magnitude.
start_shift: Shift index for the start LSMA sample.
end_shift: Shift index for the end LSMA sample.
"""
lines = ("angle", "color_index")
params = (
("lsma_period", 25),
("angle_threshold", 15),
("start_shift", 4),
("end_shift", 0),
)
def __init__(self):
"""Set minimum-history requirement and initialize derived scale factor."""
needed = (
int(max(self.p.lsma_period + self.p.start_shift, self.p.lsma_period + self.p.end_shift))
+ 2
)
self.addminperiod(needed)
self._m_factor = None
def _lsma(self, shift):
period = int(self.p.lsma_period)
values = [float(self.data.close[-(shift + i)]) for i in range(period)]
x = list(range(period))
x_mean = sum(x) / period
y_mean = sum(values) / period
numerator = sum((xi - x_mean) * (yi - y_mean) for xi, yi in zip(x, values))
denominator = sum((xi - x_mean) ** 2 for xi in x)
slope = numerator / denominator if denominator else 0.0
intercept = y_mean - slope * x_mean
return intercept + slope * (period - 1)
def _ensure_factor(self):
if self._m_factor is not None:
return
point = (
float(getattr(self.data, "_dataname", None).attrs["point"])
if hasattr(getattr(self.data, "_dataname", None), "attrs")
and "point" in self.data._dataname.attrs
else None
)
if point is None or point <= 0:
close0 = float(self.data.close[0]) if len(self.data) else 0.0
point = 0.01 if abs(close0) >= 10 and abs(close0) < 1000 else 0.0001
shift_diff = float(int(self.p.start_shift) - int(self.p.end_shift))
self._m_factor = (
1000.0
if abs(float(self.data.close[0])) >= 10 and abs(float(self.data.close[0])) < 1000
else 100000.0
) / shift_diff
[docs]
def next(self):
"""Calculate the current angle and update color index for the next bar."""
if int(self.p.end_shift) >= int(self.p.start_shift):
self.lines.angle[0] = 0.0
self.lines.color_index[0] = 2
return
self._ensure_factor()
end_ma = self._lsma(int(self.p.end_shift))
start_ma = self._lsma(int(self.p.start_shift))
angle = self._m_factor * (end_ma - start_ma) / 2.0
self.lines.angle[0] = angle
clr = 2
threshold = float(self.p.angle_threshold)
prev_angle = (
float(self.lines.angle[-1])
if len(self) > 0 and math.isfinite(float(self.lines.angle[-1]))
else angle
)
if angle > threshold:
if angle > prev_angle:
clr = 4
elif angle < prev_angle:
clr = 3
if angle < -threshold:
if angle < prev_angle:
clr = 0
elif angle > prev_angle:
clr = 1
self.lines.color_index[0] = clr