#!/usr/bin/env python
"""Directional Movement Indicator Module - ADX and DI indicators.
This module provides the ADX (Average Directional Index) and
Directional Indicators developed by J. Welles Wilder, Jr. for
measuring trend strength.
Classes:
UpMove: Upward move calculation.
DownMove: Downward move calculation.
_DirectionalIndicator: Base class for DI calculations.
DirectionalIndicator: DI indicator (alias: DI).
PlusDirectionalIndicator: +DI indicator (aliases: PlusDI, +DI).
MinusDirectionalIndicator: -DI indicator (aliases: MinusDI, -DI).
AverageDirectionalMovementIndex: ADX indicator (alias: ADX).
AverageDirectionalMovementIndexRating: ADXR indicator (alias: ADXR).
DirectionalMovementIndex: DMI with ADX and DI (alias: DMI).
DirectionalMovement: Complete DM system (alias: DM).
Example:
class MyStrategy(bt.Strategy):
def __init__(self):
# Calculate ADX to measure trend strength
self.adx = bt.indicators.ADX(self.data, period=14)
# Or use DI for +DI and -DI
self.di = bt.indicators.DI(self.data, period=14)
def next(self):
# Buy when trend is strong (ADX > 25) and +DI crosses above -DI
if self.adx[0] > 25 and self.di.plusDI[0] > self.di.minusDI[0]:
self.buy()
"""
from ..lineroot import LineRoot
from . import ATR, And, If, Indicator, MovAv
[docs]
class UpMove(Indicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"* as part of the Directional Move System to
calculate Directional Indicators.
Positive if the given data has moved higher than the previous day
Formula:
- upmove = data - data(-1)
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
lines = ("upmove",)
def __init__(self):
"""Initialize the UpMove indicator.
Sets minimum period to 2 for difference calculation.
"""
super().__init__()
self.addminperiod(2)
[docs]
def next(self):
"""Calculate up move for the current bar.
Returns data - data(-1), the positive price change.
"""
self.lines.upmove[0] = self.data[0] - self.data[-1]
[docs]
def once(self, start, end):
"""Calculate up moves in runonce mode.
Computes data[i] - data[i-1] for each bar.
"""
darray = self.data.array
larray = self.lines.upmove.array
while len(larray) < end:
larray.append(float("nan"))
for i in range(1, min(end, len(darray))):
larray[i] = darray[i] - darray[i - 1]
[docs]
class DownMove(Indicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"* as part of the Directional Move System to
calculate Directional Indicators.
Positive if the given data has moved lower than the previous day
Formula:
- downmove = data(-1) - data
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
lines = ("downmove",)
def __init__(self):
"""Initialize the DownMove indicator.
Sets minimum period to 2 for difference calculation.
"""
super().__init__()
self.addminperiod(2)
[docs]
def next(self):
"""Calculate down move for the current bar.
Returns data(-1) - data, the negative price change as positive value.
"""
self.lines.downmove[0] = self.data[-1] - self.data[0]
[docs]
def once(self, start, end):
"""Calculate down moves in runonce mode.
Computes data[i-1] - data[i] for each bar.
"""
darray = self.data.array
larray = self.lines.downmove.array
while len(larray) < end:
larray.append(float("nan"))
for i in range(1, min(end, len(darray))):
larray[i] = darray[i - 1] - darray[i]
class _DirectionalIndicator(Indicator):
"""
This class serves as the root base class for all "Directional Movement
System" related indicators, given that the calculations are first common
and then derived from the common calculations.
It can calculate the +DI and -DI values (using kwargs as the hint as to
what to calculate) but doesn't assign them to lines. This is left for
sublcases of this class.
"""
params = (("period", 14), ("movav", MovAv.Smoothed))
plotlines = {"plusDI": {"_name": "+DI"}, "minusDI": {"_name": "-DI"}}
def _plotlabel(self):
plabels = [self.p.period]
plabels += [self.p.movav] * self.p.notdefault("movav")
return plabels
def __init__(self, _plus=True, _minus=True):
"""Initialize the directional indicator base.
Calculates +DI and -DI values based on directional movement.
Args:
_plus: Whether to calculate plus DI.
_minus: Whether to calculate minus DI.
"""
self._di_atr = ATR(self.data, period=self.p.period, movav=self.p.movav)
upmove = self.data.high - self.data.high(-1)
downmove = self.data.low(-1) - self.data.low
if _plus:
plus = And(upmove > downmove, upmove > 0.0)
plusDM = If(plus, upmove, 0.0)
self._plusDMav = self.p.movav(plusDM, period=self.p.period)
self.DIplus = 100.0 * self._plusDMav / self._di_atr
if _minus:
minus = And(downmove > upmove, downmove > 0.0)
minusDM = If(minus, downmove, 0.0)
self._minusDMav = self.p.movav(minusDM, period=self.p.period)
self.DIminus = 100.0 * self._minusDMav / self._di_atr
super().__init__()
[docs]
class DirectionalIndicator(_DirectionalIndicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength
This indicator shows +DI, -DI:
- Use PlusDirectionalIndicator (PlusDI) to get +DI
- Use MinusDirectionalIndicator (MinusDI) to get -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use AverageDirectionalIndexRating (ADXR) to get ADX, ADXR
- Use DirectionalMovementIndex (DMI) to get ADX, +DI, -DI
- Use DirectionalMovement (DM) to get ADX, ADXR, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
- -di = 100 * MovingAverage(-dm, period) / atr(period)
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = ("DI",)
lines = (
"plusDI",
"minusDI",
)
def __init__(self):
"""Initialize the Directional Indicator.
Calculates both +DI and -DI from raw OHLC using Wilder smoothing.
"""
super().__init__()
self.lines.plusDI = self.DIplus
self.lines.minusDI = self.DIminus
# State for Wilder smoothing (used by next/prenext)
self._sm_tr = 0.0
self._sm_pdm = 0.0
self._sm_mdm = 0.0
self._bar_count = 0
def prenext(self):
"""Accumulate during warmup period."""
self._di_accumulate()
def nextstart(self):
"""Transition from warmup to live calculation."""
self._di_accumulate()
def next(self):
"""Calculate +DI and -DI for the current bar."""
self._di_accumulate()
def _di_accumulate(self):
"""Accumulate TR/DM and compute DI using Wilder smoothing."""
period = self.p.period
high = self.data.high[0]
low = self.data.low[0]
if len(self.data) <= 1:
return
try:
prev_high = self.data.high[-1]
prev_low = self.data.low[-1]
prev_close = self.data.close[-1]
except IndexError:
return
true_high = max(high, prev_close)
true_low = min(low, prev_close)
tr = true_high - true_low
upmove = high - prev_high
downmove = prev_low - low
pdm = upmove if (upmove > downmove and upmove > 0) else 0.0
mdm = downmove if (downmove > upmove and downmove > 0) else 0.0
self._bar_count += 1
if self._bar_count <= period:
# Accumulate sums for seeding
self._sm_tr += tr
self._sm_pdm += pdm
self._sm_mdm += mdm
else:
# Wilder smoothing: val = val - val/period + new
self._sm_tr = self._sm_tr - self._sm_tr / period + tr
self._sm_pdm = self._sm_pdm - self._sm_pdm / period + pdm
self._sm_mdm = self._sm_mdm - self._sm_mdm / period + mdm
if self._sm_tr > 0:
self.lines.plusDI[0] = 100.0 * self._sm_pdm / self._sm_tr
self.lines.minusDI[0] = 100.0 * self._sm_mdm / self._sm_tr
else:
self.lines.plusDI[0] = 0.0
self.lines.minusDI[0] = 0.0
def once(self, start, end):
"""Calculate DI values in runonce mode from raw OHLC."""
period = self.p.period
high_arr = self.data.high.array
low_arr = self.data.low.array
close_arr = self.data.close.array
dst_plus = self.lines.plusDI.array
dst_minus = self.lines.minusDI.array
while len(dst_plus) < end:
dst_plus.append(float("nan"))
while len(dst_minus) < end:
dst_minus.append(float("nan"))
sm_tr = 0.0
sm_pdm = 0.0
sm_mdm = 0.0
bar_count = 0
for i in range(1, min(end, len(high_arr), len(low_arr), len(close_arr))):
high = high_arr[i]
low = low_arr[i]
prev_high = high_arr[i - 1]
prev_low = low_arr[i - 1]
prev_close = close_arr[i - 1]
true_high = max(high, prev_close)
true_low = min(low, prev_close)
tr = true_high - true_low
upmove = high - prev_high
downmove = prev_low - low
pdm = upmove if (upmove > downmove and upmove > 0) else 0.0
mdm = downmove if (downmove > upmove and downmove > 0) else 0.0
bar_count += 1
if bar_count <= period:
sm_tr += tr
sm_pdm += pdm
sm_mdm += mdm
else:
sm_tr = sm_tr - sm_tr / period + tr
sm_pdm = sm_pdm - sm_pdm / period + pdm
sm_mdm = sm_mdm - sm_mdm / period + mdm
if sm_tr > 0:
dst_plus[i] = 100.0 * sm_pdm / sm_tr
dst_minus[i] = 100.0 * sm_mdm / sm_tr
else:
dst_plus[i] = 0.0
dst_minus[i] = 0.0
prenext = LineRoot.prenext # noqa: F811
nextstart = LineRoot.nextstart # noqa: F811
next = LineRoot.next # noqa: F811
preonce = LineRoot.preonce # noqa: F811
oncestart = LineRoot.oncestart # noqa: F811
once = LineRoot.once # noqa: F811
[docs]
class PlusDirectionalIndicator(_DirectionalIndicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength
This indicator shows +DI:
- Use MinusDirectionalIndicator (MinusDI) to get -DI
- Use Directional Indicator (DI) to get +DI, -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use AverageDirectionalIndexRating (ADXR) to get ADX, ADXR
- Use DirectionalMovementIndex (DMI) to get ADX, +DI, -DI
- Use DirectionalMovement (DM) to get ADX, ADXR, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = (("PlusDI", "+DI"),)
lines = ("plusDI",)
plotinfo = {"plotname": "+DirectionalIndicator"}
def __init__(self):
"""Initialize the +DI indicator."""
super().__init__(_minus=False)
self.lines.plusDI = self.DIplus
self._sm_tr = 0.0
self._sm_pdm = 0.0
self._bar_count = 0
def _pdi_accumulate(self):
period = self.p.period
high = self.data.high[0]
low = self.data.low[0]
if len(self.data) <= 1:
return
try:
prev_high = self.data.high[-1]
prev_low = self.data.low[-1]
prev_close = self.data.close[-1]
except IndexError:
return
tr = max(high, prev_close) - min(low, prev_close)
upmove = high - prev_high
downmove = prev_low - low
pdm = upmove if (upmove > downmove and upmove > 0) else 0.0
self._bar_count += 1
if self._bar_count <= period:
self._sm_tr += tr
self._sm_pdm += pdm
else:
self._sm_tr = self._sm_tr - self._sm_tr / period + tr
self._sm_pdm = self._sm_pdm - self._sm_pdm / period + pdm
self.lines.plusDI[0] = (100.0 * self._sm_pdm / self._sm_tr) if self._sm_tr > 0 else 0.0
def prenext(self):
self._pdi_accumulate()
def nextstart(self):
self._pdi_accumulate()
def next(self):
self._pdi_accumulate()
def once(self, start, end):
period = self.p.period
high_arr = self.data.high.array
low_arr = self.data.low.array
close_arr = self.data.close.array
dst = self.lines.plusDI.array
while len(dst) < end:
dst.append(float("nan"))
sm_tr = 0.0
sm_pdm = 0.0
bc = 0
for i in range(1, min(end, len(high_arr), len(low_arr), len(close_arr))):
tr = max(high_arr[i], close_arr[i - 1]) - min(low_arr[i], close_arr[i - 1])
upmove = high_arr[i] - high_arr[i - 1]
downmove = low_arr[i - 1] - low_arr[i]
pdm = upmove if (upmove > downmove and upmove > 0) else 0.0
bc += 1
if bc <= period:
sm_tr += tr
sm_pdm += pdm
else:
sm_tr = sm_tr - sm_tr / period + tr
sm_pdm = sm_pdm - sm_pdm / period + pdm
dst[i] = (100.0 * sm_pdm / sm_tr) if sm_tr > 0 else 0.0
prenext = LineRoot.prenext # noqa: F811
nextstart = LineRoot.nextstart # noqa: F811
next = LineRoot.next # noqa: F811
preonce = LineRoot.preonce # noqa: F811
oncestart = LineRoot.oncestart # noqa: F811
once = LineRoot.once # noqa: F811
[docs]
class MinusDirectionalIndicator(_DirectionalIndicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength
This indicator shows -DI:
- Use PlusDirectionalIndicator (PlusDI) to get +DI
- Use Directional Indicator (DI) to get +DI, -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use AverageDirectionalIndexRating (ADXR) to get ADX, ADXR
- Use DirectionalMovementIndex (DMI) to get ADX, +DI, -DI
- Use DirectionalMovement (DM) to get ADX, ADXR, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- -di = 100 * MovingAverage(-dm, period) / atr(period)
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = (("MinusDI", "-DI"),)
lines = ("minusDI",)
plotinfo = {"plotname": "-DirectionalIndicator"}
def __init__(self):
"""Initialize the -DI indicator."""
super().__init__(_plus=False)
self.lines.minusDI = self.DIminus
self._sm_tr = 0.0
self._sm_mdm = 0.0
self._bar_count = 0
def _mdi_accumulate(self):
period = self.p.period
high = self.data.high[0]
low = self.data.low[0]
if len(self.data) <= 1:
return
try:
prev_high = self.data.high[-1]
prev_low = self.data.low[-1]
prev_close = self.data.close[-1]
except IndexError:
return
tr = max(high, prev_close) - min(low, prev_close)
upmove = high - prev_high
downmove = prev_low - low
mdm = downmove if (downmove > upmove and downmove > 0) else 0.0
self._bar_count += 1
if self._bar_count <= period:
self._sm_tr += tr
self._sm_mdm += mdm
else:
self._sm_tr = self._sm_tr - self._sm_tr / period + tr
self._sm_mdm = self._sm_mdm - self._sm_mdm / period + mdm
self.lines.minusDI[0] = (100.0 * self._sm_mdm / self._sm_tr) if self._sm_tr > 0 else 0.0
def prenext(self):
self._mdi_accumulate()
def nextstart(self):
self._mdi_accumulate()
def next(self):
self._mdi_accumulate()
def once(self, start, end):
period = self.p.period
high_arr = self.data.high.array
low_arr = self.data.low.array
close_arr = self.data.close.array
dst = self.lines.minusDI.array
while len(dst) < end:
dst.append(float("nan"))
sm_tr = 0.0
sm_mdm = 0.0
bc = 0
for i in range(1, min(end, len(high_arr), len(low_arr), len(close_arr))):
tr = max(high_arr[i], close_arr[i - 1]) - min(low_arr[i], close_arr[i - 1])
upmove = high_arr[i] - high_arr[i - 1]
downmove = low_arr[i - 1] - low_arr[i]
mdm = downmove if (downmove > upmove and downmove > 0) else 0.0
bc += 1
if bc <= period:
sm_tr += tr
sm_mdm += mdm
else:
sm_tr = sm_tr - sm_tr / period + tr
sm_mdm = sm_mdm - sm_mdm / period + mdm
dst[i] = (100.0 * sm_mdm / sm_tr) if sm_tr > 0 else 0.0
prenext = LineRoot.prenext # noqa: F811
nextstart = LineRoot.nextstart # noqa: F811
next = LineRoot.next # noqa: F811
preonce = LineRoot.preonce # noqa: F811
oncestart = LineRoot.oncestart # noqa: F811
once = LineRoot.once # noqa: F811
[docs]
class AverageDirectionalMovementIndex(Indicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength. Rewritten following MACD pattern
with explicit next()/once() methods for reliable calculation.
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
- -di = 100 * MovingAverage(-dm, period) / atr(period)
- dx = 100 * abs(+di - -di) / (+di + -di)
- adx = MovingAverage(dx, period)
"""
alias = ("ADX",)
lines = ("adx",)
params = (("period", 14), ("movav", MovAv.Smoothed))
plotlines = {"adx": {"_name": "ADX"}}
def __init__(self):
"""Initialize the ADX indicator.
Sets up ATR, DM smoothing, and state for ADX calculation.
"""
super().__init__()
period = self.p.period
atr = ATR(self.data, period=period, movav=self.p.movav)
upmove = self.data.high - self.data.high(-1)
downmove = self.data.low(-1) - self.data.low
plus = And(upmove > downmove, upmove > 0.0)
plusDM = If(plus, upmove, 0.0)
plusDMav = self.p.movav(plusDM, period=period)
self.DIplus = 100.0 * plusDMav / atr
minus = And(downmove > upmove, downmove > 0.0)
minusDM = If(minus, downmove, 0.0)
minusDMav = self.p.movav(minusDM, period=period)
self.DIminus = 100.0 * minusDMav / atr
dx = abs(self.DIplus - self.DIminus) / (self.DIplus + self.DIminus)
self.lines.adx = 100.0 * self.p.movav(dx, period=period)
# Store sub-indicators for direct array access (like MACD)
self.atr = ATR(self.data, period=period, movav=self.p.movav)
self.plusDMav = self.p.movav(period=period)
self.minusDMav = self.p.movav(period=period)
# Calculate minperiod: ATR needs period, then DI smoothing needs period, then ADX smoothing needs period
# Total: approximately 2*period for DI + period for ADX smoothing
adx_minperiod = 2 * period
self._minperiod = max(self._minperiod, adx_minperiod)
# Propagate minperiod to lines
for line in self.lines:
line.updateminperiod(self._minperiod)
# For SMMA calculation
self.alpha = 1.0 / period
self.alpha1 = 1.0 - self.alpha
# State for smoothed values
self._plusDMav_val = 0.0
self._minusDMav_val = 0.0
self._adx_val = 0.0
self._adx_bootstrapped = False
self._last_adx_idx = None
def prenext(self):
"""Track previous high/low during warmup.
Stores high and low values for directional move calculation.
"""
def _adx_atr_value(self, atr_array, index):
import math
atr_val = atr_array[index]
if atr_val == 0 or (isinstance(atr_val, float) and math.isnan(atr_val)):
return 0.0001
return atr_val
def _adx_dx(self, plus_dmav, minus_dmav, atr_val):
diplus = 100.0 * plus_dmav / atr_val
diminus = 100.0 * minus_dmav / atr_val
disum = diplus + diminus
return 100.0 * abs(diplus - diminus) / disum if disum != 0 else 0.0
def _adx_bootstrap(self, target_idx):
high_array = self.data.high.array
low_array = self.data.low.array
atr_array = self.atr.lines[0].array
period = self.p.period
seed_idx = 2 * period - 1
data_len = min(len(high_array), len(low_array), len(atr_array))
if target_idx < seed_idx or data_len <= seed_idx:
return False
dm_plus_sum = 0.0
dm_minus_sum = 0.0
for i in range(1, period + 1):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
dm_plus_sum += upmove if (upmove > downmove and upmove > 0) else 0.0
dm_minus_sum += downmove if (downmove > upmove and downmove > 0) else 0.0
plus_dmav = dm_plus_sum / period
minus_dmav = dm_minus_sum / period
dx_values = [self._adx_dx(plus_dmav, minus_dmav, self._adx_atr_value(atr_array, period))]
for i in range(period + 1, seed_idx + 1):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
plus_dm = upmove if (upmove > downmove and upmove > 0) else 0.0
minus_dm = downmove if (downmove > upmove and downmove > 0) else 0.0
plus_dmav = plus_dmav * self.alpha1 + plus_dm * self.alpha
minus_dmav = minus_dmav * self.alpha1 + minus_dm * self.alpha
dx_values.append(self._adx_dx(plus_dmav, minus_dmav, self._adx_atr_value(atr_array, i)))
self._plusDMav_val = plus_dmav
self._minusDMav_val = minus_dmav
self._adx_val = sum(dx_values[:period]) / period
self._last_adx_idx = seed_idx
self._adx_bootstrapped = True
return True
def _adx_advance_to(self, target_idx):
if not self._adx_bootstrapped and not self._adx_bootstrap(target_idx):
return False
high_array = self.data.high.array
low_array = self.data.low.array
atr_array = self.atr.lines[0].array
data_len = min(len(high_array), len(low_array), len(atr_array))
if target_idx >= data_len:
return False
for i in range(self._last_adx_idx + 1, target_idx + 1):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
plus_dm = upmove if (upmove > downmove and upmove > 0) else 0.0
minus_dm = downmove if (downmove > upmove and downmove > 0) else 0.0
self._plusDMav_val = self._plusDMav_val * self.alpha1 + plus_dm * self.alpha
self._minusDMav_val = self._minusDMav_val * self.alpha1 + minus_dm * self.alpha
dx = self._adx_dx(
self._plusDMav_val,
self._minusDMav_val,
self._adx_atr_value(atr_array, i),
)
self._adx_val = self._adx_val * self.alpha1 + dx * self.alpha
self._last_adx_idx = i
return True
def nextstart(self):
"""Seed ADX calculation on first valid bar.
Calculates initial DM, DI, DX, and ADX values.
"""
idx = self.lines[0].idx
if self._adx_advance_to(idx):
self.lines.adx[0] = self._adx_val
else:
self.lines.adx[0] = float("nan")
def next(self):
"""Calculate ADX for the current bar.
Calculates DM smoothing, DI, DX, and ADX values.
"""
idx = self.lines[0].idx
if self._adx_advance_to(idx):
self.lines.adx[0] = self._adx_val
else:
self.lines.adx[0] = float("nan")
def once(self, start, end):
"""Calculate ADX in runonce mode using proper Wilder seeding.
Implements the standard Wilder ADX algorithm:
Phase 1: Seed smoothed DM with SMA of first N DM values (bars 1..period)
Phase 2: Continue SMMA for DM, accumulate DX (bars period..2*period-1)
Phase 3: Seed ADX with SMA of first N DX values
Phase 4: Continue SMMA for all components (bars 2*period..end)
"""
import math
# Get source arrays
high_array = self.data.high.array
low_array = self.data.low.array
atr_array = self.atr.lines[0].array
adx_array = self.lines.adx.array
period = self.p.period
alpha = self.alpha
alpha1 = self.alpha1
# Ensure arrays are sized
while len(adx_array) < end:
adx_array.append(float("nan"))
data_len = min(end, len(high_array), len(low_array), len(atr_array))
# Pre-fill warmup with NaN
for i in range(min(2 * period, data_len)):
adx_array[i] = float("nan")
# Need at least 2*period + 1 bars for proper seeding
if data_len <= 2 * period:
return
# Phase 1: Calculate raw DM values for bars 1..period
# Seed smoothed DM with SMA (average) of first period values
dm_plus_sum = 0.0
dm_minus_sum = 0.0
for i in range(1, period + 1):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
plusDM = upmove if (upmove > downmove and upmove > 0) else 0.0
minusDM = downmove if (downmove > upmove and downmove > 0) else 0.0
dm_plus_sum += plusDM
dm_minus_sum += minusDM
plusDMav = dm_plus_sum / period
minusDMav = dm_minus_sum / period
# Phase 2: Calculate DI and DX for bars period..2*period-1
# Continue SMMA for DM, accumulate DX values for ADX seeding
dx_list = []
# First DX at bar period (using seeded DM averages)
atr_val = atr_array[period]
if atr_val == 0 or (isinstance(atr_val, float) and math.isnan(atr_val)):
atr_val = 0.0001
diplus = 100.0 * plusDMav / atr_val
diminus = 100.0 * minusDMav / atr_val
disum = diplus + diminus
dx = 100.0 * abs(diplus - diminus) / disum if disum != 0 else 0.0
dx_list.append(dx)
# Continue for bars period+1 to 2*period-1
for i in range(period + 1, 2 * period):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
plusDM = upmove if (upmove > downmove and upmove > 0) else 0.0
minusDM = downmove if (downmove > upmove and downmove > 0) else 0.0
plusDMav = plusDMav * alpha1 + plusDM * alpha
minusDMav = minusDMav * alpha1 + minusDM * alpha
atr_val = atr_array[i]
if atr_val == 0 or (isinstance(atr_val, float) and math.isnan(atr_val)):
atr_val = 0.0001
diplus = 100.0 * plusDMav / atr_val
diminus = 100.0 * minusDMav / atr_val
disum = diplus + diminus
dx = 100.0 * abs(diplus - diminus) / disum if disum != 0 else 0.0
dx_list.append(dx)
# Phase 3: Seed ADX with SMA of first period DX values
adx_val = sum(dx_list[:period]) / period
adx_array[2 * period - 1] = adx_val
# Phase 4: Continue SMMA for everything from 2*period onwards
for i in range(2 * period, data_len):
upmove = high_array[i] - high_array[i - 1]
downmove = low_array[i - 1] - low_array[i]
plusDM = upmove if (upmove > downmove and upmove > 0) else 0.0
minusDM = downmove if (downmove > upmove and downmove > 0) else 0.0
plusDMav = plusDMav * alpha1 + plusDM * alpha
minusDMav = minusDMav * alpha1 + minusDM * alpha
atr_val = atr_array[i]
if atr_val == 0 or (isinstance(atr_val, float) and math.isnan(atr_val)):
atr_val = 0.0001
diplus = 100.0 * plusDMav / atr_val
diminus = 100.0 * minusDMav / atr_val
disum = diplus + diminus
dx = 100.0 * abs(diplus - diminus) / disum if disum != 0 else 0.0
adx_val = adx_val * alpha1 + dx * alpha
adx_array[i] = adx_val
prenext = LineRoot.prenext # noqa: F811
nextstart = LineRoot.nextstart # noqa: F811
next = LineRoot.next # noqa: F811
preonce = LineRoot.preonce # noqa: F811
oncestart = LineRoot.oncestart # noqa: F811
once = LineRoot.once # noqa: F811
[docs]
class AverageDirectionalMovementIndexRating(AverageDirectionalMovementIndex):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength.
ADXR is the average of ADX with a value period bars ago
This indicator shows the ADX and ADXR:
- Use PlusDirectionalIndicator (PlusDI) to get +DI
- Use MinusDirectionalIndicator (MinusDI) to get -DI
- Use Directional Indicator (DI) to get +DI, -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use DirectionalMovementIndex (DMI) to get ADX, +DI, -DI
- Use DirectionalMovement (DM) to get ADX, ADXR, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
- -di = 100 * MovingAverage(-dm, period) / atr(period)
- dx = 100 * abs(+di - -di) / (+di + -di)
- adx = MovingAverage(dx, period)
- adxr = (adx + adx(-period)) / 2
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = ("ADXR",)
lines = ("adxr",)
plotlines = {"adxr": {"_name": "ADXR"}}
def __init__(self):
"""Initialize the ADXR indicator.
Extends ADX with rating line.
"""
super().__init__()
self.lines.adxr = (self.l.adx + self.l.adx(-self.p.period)) / 2.0
def next(self):
"""Calculate ADX and ADXR for the current bar.
ADXR = (ADX + ADX(-period)) / 2
"""
super().next()
self.lines.adxr[0] = (self.lines.adx[0] + self.lines.adx[-self.p.period]) / 2.0
def once(self, start, end):
"""Calculate ADXR in runonce mode.
Computes ADXR as average of current ADX and ADX from period ago.
"""
super().once(start, end)
import math
adx_array = self.lines.adx.array
adxr_array = self.lines.adxr.array
period = self.p.period
while len(adxr_array) < end:
adxr_array.append(float("nan"))
for i in range(start, min(end, len(adx_array))):
if i >= period:
adx_curr = adx_array[i] if i < len(adx_array) else 0.0
adx_prev = (
adx_array[i - period]
if i - period >= 0 and i - period < len(adx_array)
else 0.0
)
if (
isinstance(adx_curr, float)
and math.isnan(adx_curr)
or isinstance(adx_prev, float)
and math.isnan(adx_prev)
):
adxr_array[i] = float("nan")
else:
adxr_array[i] = (adx_curr + adx_prev) / 2.0
else:
adxr_array[i] = float("nan")
prenext = LineRoot.prenext # noqa: F811
nextstart = LineRoot.nextstart # noqa: F811
next = LineRoot.next # noqa: F811
preonce = LineRoot.preonce # noqa: F811
oncestart = LineRoot.oncestart # noqa: F811
once = LineRoot.once # noqa: F811
[docs]
class DirectionalMovementIndex(AverageDirectionalMovementIndex, DirectionalIndicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength
This indicator shows the ADX, +DI, -DI:
- Use PlusDirectionalIndicator (PlusDI) to get +DI
- Use MinusDirectionalIndicator (MinusDI) to get -DI
- Use Directional Indicator (DI) to get +DI, -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use AverageDirectionalIndexRating (ADXRating) to get ADX, ADXR
- Use DirectionalMovement (DM) to get ADX, ADXR, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
- -di = 100 * MovingAverage(-dm, period) / atr(period)
- dx = 100 * abs(+di - -di) / (+di + -di)
- adx = MovingAverage(dx, period)
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = ("DMI",)
[docs]
class DirectionalMovement(AverageDirectionalMovementIndexRating, DirectionalIndicator):
"""
Defined by J. Welles Wilder, Jr. in 1978 in his book *"New Concepts in
Technical Trading Systems"*.
Intended to measure trend strength
This indicator shows ADX, ADXR, +DI, -DI.
- Use PlusDirectionalIndicator (PlusDI) to get +DI
- Use MinusDirectionalIndicator (MinusDI) to get -DI
- Use Directional Indicator (DI) to get +DI, -DI
- Use AverageDirectionalIndex (ADX) to get ADX
- Use AverageDirectionalIndexRating (ADXR) to get ADX, ADXR
- Use DirectionalMovementIndex (DMI) to get ADX, +DI, -DI
Formula:
- upmove = high - high(-1)
- downmove = low(-1) - low
- +dm = upmove if upmove > downmove and upmove > 0 else 0
- -dm = downmove if downmove > upmove and downmove > 0 else 0
- +di = 100 * MovingAverage(+dm, period) / atr(period)
- -di = 100 * MovingAverage(-dm, period) / atr(period)
- dx = 100 * abs(+di - -di) / (+di + -di)
- adx = MovingAverage(dx, period)
The moving average used is the one originally defined by Wilder,
the SmoothedMovingAverage
See:
- https://en.wikipedia.org/wiki/Average_directional_movement_index
"""
alias = ("DM",)