Source code for backtrader.indicators.contrib.moving_average_fn_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
import re
from collections import deque
from pathlib import Path
from .. import Indicator
__all__ = [
"MovingAverageFNIndicator",
]
SOURCE_MQ5 = (
Path(__file__).resolve().parents[2]
/ "ea"
/ "1276_Exp_MovingAverage_FN"
/ "movingaverage_fn.mq5"
)
def resolve_price_line(data, mode):
"""Return the price line selected by an MT5 applied-price mode.
Args:
data: The data feed providing OHLC lines.
mode: Applied-price mode name (e.g. ``price_close``, ``price_median``).
Returns:
The line or line expression for the requested applied price; defaults
to the close line for unrecognized modes.
"""
price_mode = str(mode).lower()
if price_mode in {"price_open", "open"}:
return data.open
if price_mode in {"price_high", "high"}:
return data.high
if price_mode in {"price_low", "low"}:
return data.low
if price_mode in {"price_median", "median"}:
return (data.high + data.low) / 2.0
if price_mode in {"price_typical", "typical"}:
return (data.high + data.low + data.close) / 3.0
if price_mode in {"price_weighted", "weighted"}:
return (data.high + data.low + data.close + data.close) / 4.0
return data.close
def load_fn_coefficients(filter_name="N44"):
"""Parse FIR filter coefficients for a named filter from the MQ5 source.
Args:
filter_name: Filter case name (e.g. ``N44``) to extract.
Returns:
List of float coefficients ordered by price-series offset; ``[1.0]`` if
the source file is missing.
Raises:
ValueError: If the filter or the following case marker is not found.
"""
if not SOURCE_MQ5.exists():
return [1.0]
raw = SOURCE_MQ5.read_bytes()
candidates = []
for encoding in ("utf-16", "utf-16-le", "utf-8", "latin-1"):
try:
candidates.append(raw.decode(encoding, errors="ignore"))
except Exception:
continue
text = ""
for candidate in candidates:
normalized = candidate.replace("\x00", "")
if f"case {filter_name}:" in normalized:
text = normalized
break
if not text:
text = raw.decode("latin-1", errors="ignore").replace("\x00", "")
start = text.find(f"case {filter_name}:")
if start == -1:
raise ValueError(f"Filter {filter_name} not found in {SOURCE_MQ5}")
next_case = re.search(r"\n\s*case\s+N\d+:", text[start + 1 :])
if not next_case:
raise ValueError(f"Could not locate next filter after {filter_name}")
block = text[start : start + 1 + next_case.start()]
matches = re.findall(
r"([+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?)\*PriceSeries\(Price,index(?:-(\d+))?", block
)
coeff_map = {int(offset or "0"): float(coef) for coef, offset in matches}
coeffs = [coeff_map[i] for i in range(max(coeff_map) + 1)]
return coeffs
[docs]
class MovingAverageFNIndicator(Indicator):
"""FIR moving average built from named filter coefficients plus smoothing."""
lines = ("mafn",)
params = (
("filter_number", "N44"),
("xma_method", "jjma"),
("xlength", 12),
("xphase", 15),
("ipc", "price_close"),
("price_shift", 0),
)
def __init__(self):
"""Load the filter coefficients and set up the smoothing buffer."""
self._coeffs = load_fn_coefficients(self.p.filter_number)
self._price_line = resolve_price_line(self.data, self.p.ipc)
self._smooth_values = deque(maxlen=max(1, int(self.p.xlength)))
self.addminperiod(len(self._coeffs) + self.p.xlength + 5)
def _smooth_filtered(self, value):
self._smooth_values.append(value)
values = list(self._smooth_values)
if not values:
return value
mode = str(self.p.xma_method).lower()
if mode in {"sma", "mode_sma"}:
return sum(values) / len(values)
if mode in {"ema", "mode_ema"}:
alpha = 2.0 / (len(values) + 1.0)
ema = values[0]
for item in values[1:]:
ema = alpha * item + (1.0 - alpha) * ema
return ema
weights = list(range(1, len(values) + 1))
weight_sum = float(sum(weights))
return sum(v * w for v, w in zip(values, weights)) / weight_sum
[docs]
def next(self):
"""Convolve the filter coefficients with price and emit the smoothed value."""
filtered = 0.0
for offset, coef in enumerate(self._coeffs):
filtered += coef * float(self._price_line[-offset])
self.lines.mafn[0] = self._smooth_filtered(filtered) + float(self.p.price_shift)