Source code for backtrader.indicators.contrib.bsi_indicator

#!/usr/bin/env python
"""Functional-test indicators migrated to contrib.

Generated from a single functional strategy module to preserve file-local
helper functions and constants without cross-test name collisions.
"""

from .. import Indicator

__all__ = [
    "BSIIndicator",
]


[docs] class BSIIndicator(Indicator): """Balance of power-style indicator returning BSI value and trend color.""" lines = ("bsi", "color") params = ( ("range_period", 20), ("slowing", 3), ("avg_period", 3), ("volume_mode", "TICK"), ) def __init__(self): """Set minimum period and initialize smoothing configuration.""" self.addminperiod( int(self.p.range_period) + int(self.p.slowing) + int(self.p.avg_period) + 3 ) def _component(self, ago): sumpos = 0.0 sumneg = 0.0 sumhigh = 0.0 for k in range(int(self.p.slowing)): idx = ago + k highs = [float(self.data.high[-(idx + j)]) for j in range(int(self.p.range_period))] lows = [float(self.data.low[-(idx + j)]) for j in range(int(self.p.range_period))] hh = max(highs) ll = min(lows) rng = max(hh - ll, 1e-12) bark_close = float(self.data.close[-idx]) bark_prev_close = float(self.data.close[-(idx + 1)]) bark_high = float(self.data.high[-idx]) bark_low = float(self.data.low[-idx]) sp = bark_high - bark_low if self.p.volume_mode == "NONE": vol = 1.0 elif self.p.volume_mode == "VOLUME": vmax = max( float(self.data.openinterest[-(idx + j)]) for j in range(int(self.p.range_period)) ) vol = float(self.data.openinterest[-idx]) / vmax if vmax else 0.0 else: vmax = max( float(self.data.volume[-(idx + j)]) for j in range(int(self.p.range_period)) ) vol = float(self.data.volume[-idx]) / vmax if vmax else 0.0 ratio = 0.0 if not (bark_prev_close - sp * 0.2 > bark_close): ratio = 1.0 if bark_low == ll else (hh - bark_low) / rng sumpos += (bark_close - bark_low) * ratio * vol if not (bark_prev_close + sp * 0.2 < bark_close): ratio = 1.0 if bark_high == hh else (bark_high - ll) / rng sumneg += (bark_high - bark_close) * ratio * vol * -1.0 sumhigh += rng if not sumhigh: return 0.0 return (sumpos / sumhigh * 100.0) + (sumneg / sumhigh * 100.0)
[docs] def next(self): """Compute BSI and derive color from directional BSI movement.""" vals = [self._component(i) for i in range(int(self.p.avg_period))] bsi = sum(vals) / float(int(self.p.avg_period)) self.lines.bsi[0] = bsi if len(self) < 2: self.lines.color[0] = 1.0 return prev = float(self.lines.bsi[-1]) color = 1.0 if prev > bsi: color = 0.0 if prev < bsi: color = 2.0 self.lines.color[0] = color