Source code for backtrader.indicators.hadelta
#!/usr/bin/env python
"""HaDelta Indicator Module - Heikin Ashi Delta indicator.
This module provides the HaDelta indicator defined by Dan Valcu
for measuring Heikin Ashi candle body differences.
Classes:
HaDelta: Heikin Ashi Delta indicator (aliases: haD, haDelta).
Example:
class MyStrategy(bt.Strategy):
def __init__(self):
self.hadelta = bt.indicators.HaDelta(self.data)
def next(self):
if self.hadelta.smoothed[0] > 0:
self.buy()
"""
from . import Indicator
from .sma import SMA
__all__ = ["HaDelta", "haD", "haDelta"]
[docs]
class HaDelta(Indicator):
"""Heikin Ashi Delta. Defined by Dan Valcu in his book "Heikin-Ashi: How to
Trade Without Candlestick Patterns ".
This indicator measures the difference between Heikin Ashi close and open of
Heikin Ashi candles, the body of the candle.
To get signals add haDelta smoothed by 3 period moving average.
For correct use, the data for the indicator must have been previously
passed by the Heikin Ahsi filter.
Formula:
- HaDelta = Heikin Ashi close - Heikin Ashi open
- smoothed = movav(haDelta, period)
"""
alias = ("haD",)
lines = ("haDelta", "smoothed")
params = (
("period", 3),
("movav", SMA),
("autoheikin", True),
)
plotinfo = {"subplot": True}
plotlines = {
"haDelta": {"color": "red"},
"smoothed": {"color": "grey", "_fill_gt": (0, "green"), "_fill_lt": (0, "red")},
}
def __init__(self):
"""Initialize the HaDelta indicator.
Sets up Heikin Ashi calculation state if autoheikin is enabled.
"""
super().__init__()
self._autoheikin = self.p.autoheikin
# Store previous HA values for recursive calculation
self._prev_ha_close = 0.0
self._prev_ha_open = 0.0
self._first_bar = True
# Set minperiod = period + 1 to match expected behavior
self.addminperiod(self.p.period + 1)
def _calc_heikin_ashi(self):
"""Calculate Heikin Ashi values for current bar inline."""
if self._autoheikin:
# ha_close = (open + high + low + close) / 4
ha_close = (
self.data.open[0] + self.data.high[0] + self.data.low[0] + self.data.close[0]
) / 4.0
# ha_open = (prev_ha_open + prev_ha_close) / 2
if self._first_bar:
ha_open = (self.data.open[0] + self.data.close[0]) / 2.0
self._first_bar = False
else:
ha_open = (self._prev_ha_open + self._prev_ha_close) / 2.0
# Store for next bar
self._prev_ha_close = ha_close
self._prev_ha_open = ha_open
return ha_close - ha_open
return self.data.close[0] - self.data.open[0]
[docs]
def prenext(self):
"""Calculate HaDelta during warmup period.
Computes haDelta but not smoothed values during warmup.
"""
# Calculate haDelta during warmup period
hd = self._calc_heikin_ashi()
self.lines.haDelta[0] = hd
[docs]
def nextstart(self):
"""Calculate HaDelta and seed smoothed on first valid bar.
Computes haDelta and seeds smoothed with SMA of haDelta values.
"""
# First valid bar - calculate haDelta and seed smoothed with SMA
hd = self._calc_heikin_ashi()
self.lines.haDelta[0] = hd
# Seed smoothed with SMA of first period haDelta values
period = self.p.period
hd_sum = hd
for i in range(1, period):
hd_sum += self.lines.haDelta[-i]
self.lines.smoothed[0] = hd_sum / period
[docs]
def next(self):
"""Calculate HaDelta and smoothed values for current bar.
haDelta = ha_close - ha_open
smoothed = SMA(haDelta, period)
"""
# Calculate haDelta = ha_close - ha_open
hd = self._calc_heikin_ashi()
self.lines.haDelta[0] = hd
# Calculate SMA of haDelta
period = self.p.period
hd_sum = hd
for i in range(1, period):
hd_sum += self.lines.haDelta[-i]
self.lines.smoothed[0] = hd_sum / period
[docs]
def once(self, start, end):
"""Calculate HaDelta and smoothed in runonce mode.
Computes haDelta values and smoothed SMA across all bars.
"""
# Get data arrays
o_array = self.data.open.array
h_array = self.data.high.array
l_array = self.data.low.array
c_array = self.data.close.array
hd_array = self.lines.haDelta.array
sm_array = self.lines.smoothed.array
period = self.p.period
# Ensure arrays are properly sized
for arr in [hd_array, sm_array]:
while len(arr) < end:
arr.append(float("nan"))
data_end = min(end, len(o_array), len(h_array), len(l_array), len(c_array))
if self._autoheikin:
# Calculate HeikinAshi values inline
prev_ha_close = 0.0
prev_ha_open = 0.0
for i in range(data_end):
# ha_close = (open + high + low + close) / 4
ha_close = (o_array[i] + h_array[i] + l_array[i] + c_array[i]) / 4.0
# ha_open = (prev_ha_open + prev_ha_close) / 2
if i == 0:
ha_open = (o_array[i] + c_array[i]) / 2.0
else:
ha_open = (prev_ha_open + prev_ha_close) / 2.0
hd_array[i] = ha_close - ha_open
prev_ha_close = ha_close
prev_ha_open = ha_open
else:
for i in range(data_end):
hd_array[i] = c_array[i] - o_array[i]
# Calculate smoothed (SMA of haDelta)
for i in range(min(end, len(hd_array))):
if i < period - 1:
sm_array[i] = float("nan")
else:
hd_sum = 0.0
for j in range(period):
hd_sum += hd_array[i - j]
sm_array[i] = hd_sum / period
haD = HaDelta
haDelta = HaDelta # Alias for tests