Source code for backtrader.indicators.contrib.irsi_sign_indicator
#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.
Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""
from .. import (
ATR,
Indicator,
)
__all__ = [
"IRSISignIndicator",
]
def _applied_price(data, price_type, ago=0):
"""Resolve applied price by type and history shift."""
o = float(data.open[-ago])
h = float(data.high[-ago])
low_price = float(data.low[-ago])
c = float(data.close[-ago])
if price_type == 0:
return c
if price_type == 1:
return o
if price_type == 2:
return h
if price_type == 3:
return low_price
if price_type == 4:
return (h + low_price) / 2.0
if price_type == 5:
return (h + low_price + c) / 3.0
if price_type == 6:
return (h + low_price + c + c) / 4.0
return c
[docs]
class IRSISignIndicator(Indicator):
"""Compute RSI sign-flip levels and adaptive ATR-adjusted marker lines."""
lines = ("sell", "buy", "rsi", "atr")
params = (
("atr_period", 14),
("rsi_period", 14),
("rsi_price", 0),
("up_level", 70),
("dn_level", 30),
)
def __init__(self):
"""Initialize ATR series and minimum required history."""
self._atr = ATR(self.data, period=int(self.p.atr_period))
self.addminperiod(max(int(self.p.atr_period), int(self.p.rsi_period)) + 2)
def _calc_rsi(self):
"""Calculate RSI manually from configured applied prices."""
period = int(self.p.rsi_period)
gains = []
losses = []
for i in range(period):
p0 = _applied_price(self.data, int(self.p.rsi_price), i)
p1 = _applied_price(self.data, int(self.p.rsi_price), i + 1)
delta = p0 - p1
gains.append(max(delta, 0.0))
losses.append(max(-delta, 0.0))
avg_gain = sum(gains) / period if period else 0.0
avg_loss = sum(losses) / period if period else 0.0
if avg_loss == 0.0:
return 100.0 if avg_gain > 0.0 else 50.0
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1.0 + rs))
[docs]
def next(self):
"""Update buy/sell marker levels and RSI / ATR output lines."""
self.lines.sell[0] = float("nan")
self.lines.buy[0] = float("nan")
rsi_now = self._calc_rsi()
self.lines.rsi[0] = rsi_now
self.lines.atr[0] = float(self._atr[0])
if len(self) < 2:
return
rsi_prev = float(self.lines.rsi[-1])
atr_now = float(self._atr[0])
low_now = float(self.data.low[0])
high_now = float(self.data.high[0])
if rsi_now > float(self.p.dn_level) and rsi_prev <= float(self.p.dn_level):
self.lines.buy[0] = low_now - atr_now * 3.0 / 8.0
if rsi_now < float(self.p.up_level) and rsi_prev >= float(self.p.up_level):
self.lines.sell[0] = high_now + atr_now * 3.0 / 8.0