Source code for backtrader.linebuffer

#!/usr/bin/env python
"""LineBuffer Module - Circular buffer storage for time-series data.

This module provides the LineBuffer class which implements a circular
buffer for storing time-series data. The buffer allows efficient
operations like appending, forwarding, rewinding, and resetting.

Key Features:
    - Index 0 always points to the current active value
    - Positive indices fetch past values (left-hand side)
    - Negative indices fetch future values (right-hand side)
    - Automatic memory management with qbuffer
    - Line bindings for automatic value propagation

Classes:
    LineBuffer: Core circular buffer implementation.
    LineActions: Base class for line objects with multiple lines.
    LineActionsMixin: Mixin providing line operations.
    LineActionsCache: Cache system for performance optimization.
    PseudoArray: Wrapper for non-array iterables.
    LinesOperation: Operations on multiple lines.
    LineOwnOperation: Operations on owned lines.

Example:
    Basic buffer usage:
    >>> buf = LineBuffer()
    >>> buf.home()  # Reset to beginning
    >>> buf.forward()  # Move to next position
    >>> buf[0] = 100.0  # Set current value
    >>> print(buf[0])  # Get current value
    100.0
    >>> print(buf[-1])  # Get previous value
"""

import array
import collections
import datetime
import itertools
import math
from itertools import islice, repeat

from . import metabase
from .lineroot import LineRoot, LineRootMixin, LineSingle
from .utils import num2date
from .utils.log_message import get_logger
from .utils.py3 import range, string_types

logger = get_logger(__name__)

NAN = float("NaN")

# PERFORMANCE OPTIMIZATION: Pre-create default datetime for error recovery
# Avoids repeated datetime object creation in hot path
_DEFAULT_DATETIME = datetime.datetime(2000, 1, 1, 0, 0, 0)


# PERFORMANCE OPTIMIZATION: Helper function to check for NaN/None values
# Using value != value is much faster than isinstance + math.isnan
def _is_nan_or_none(value):
    """Fast check for NaN or None values.
    NaN is the only value that's not equal to itself (value != value).
    This is much faster than isinstance(value, float) and math.isnan(value).
    """
    return value is None or value != value


[docs] class LineBuffer(LineSingle, LineRootMixin): """ LineBuffer defines an interface to an "array.array" (or list) in which index 0 points to the item which is active for input and output. Positive indices fetch values from the past (left-hand side) Negative indices fetch values from the future (if the array has been extended on the right-hand side) With this behavior, no index has to be passed around to entities which have to work with the current value produced by other entities: the value is always reachable at "0". Likewise, storing the current value produced by "self" is done at 0. Additional operations to move the pointer (home, forward, extend, rewind, advance getzero) are provided The class can also hold "bindings" to other LineBuffers. When a value is set in this class, it will also be set in the binding. """ # Define LineBuffer mode attributes: UnBounded (0) and QBuffer (1) UnBounded, QBuffer = (0, 1) # Initialization
[docs] def __init__(self): """Initialize the LineBuffer instance. Sets up all internal attributes including the array storage, index pointer, buffer mode, and performance optimization flags. """ # ===== Optimization A: Pre-initialize all attributes to eliminate runtime hasattr checks ===== # Core attributes - must be initialized first self._minperiod = 1 # Minimum period self._array = array.array("d") # Internal array storage self._idx = -1 # Current index self._size = 0 # Current array size # Buffer-related attributes - set to reasonable defaults self.maxlen = 0 # Maximum length (used in QBuffer mode) self.extension = 0 # Extension size self.lencount = 0 # Length counter self.useislice = False # Whether to use islice self.extrasize = 0 # Extra size self.lenmark = 0 # Length mark # Array - initialize as empty array (will be reset based on mode in reset()) self.array = array.array("d") # Lines-related - ensure lines exists if not hasattr(self, "lines"): self.lines = [self] # lines is a list containing itself # Mode and bindings self.mode = self.UnBounded # Default unbounded mode self.bindings = [] # Binding list # Other attributes self._tz = None # Timezone setting self._owner = None # Owner object self._clock = None # Clock object self._ltype = None # Line type # Pre-calculate whether this is an indicator line to avoid repeated checks in hot paths try: self._is_indicator = (self._ltype == 0) or ("Indicator" in str(self.__class__.__name__)) except Exception: self._is_indicator = False # Performance optimization: pre-calculate whether this is a datetime line # to avoid repeated checks in __setitem__. Check once at init and cache the result. self._is_datetime_line = False try: if hasattr(self, "_name"): name_str = str(self._name).lower() self._is_datetime_line = "datetime" in name_str elif hasattr(self, "__class__"): class_str = str(self.__class__.__name__).lower() self._is_datetime_line = "datetime" in class_str except Exception: self._is_datetime_line = False # Pre-calculate default value to avoid repeated checks in __setitem__ if self._is_datetime_line: self._default_value = 1.0 # datetime lines use 1.0 (valid ordinal value) elif self._is_indicator: self._default_value = float("nan") # indicators use NaN else: self._default_value = 0.0 # others use 0.0 # Recursion guard (for __len__) self._in_len = False # Instance attribute guard replacing global set self._dt_cache_idx = None self._dt_cache_value = None self._dt_cache_tz = None self._dt_cache_dt = None # Call reset to complete initialization self.reset() # Reset, call own reset method
# Get the value of _idx
[docs] def get_idx(self): """Get the current index position. Returns: int: The current index in the buffer. """ # Optimization A: Removed hasattr check, __init__ ensures _idx exists return self._idx
def _refresh_cached_line_flags(self, owner=None, ltype=None): """Refresh cached owner/type-derived flags after a line is attached.""" if owner is not None: self._owner = owner if ltype is not None: self._ltype = ltype effective_ltype = getattr(self, "_ltype", None) owner_obj = getattr(self, "_owner", None) owner_ref = getattr(owner_obj, "_owner_ref", None) if effective_ltype is None and owner_ref is not None: effective_ltype = getattr(owner_ref, "_ltype", None) if effective_ltype is None and owner_obj is not None: effective_ltype = getattr(owner_obj, "_ltype", None) try: self._is_indicator = (effective_ltype == LineRoot.IndType) or ( "Indicator" in str(self.__class__.__name__) ) except Exception: self._is_indicator = False try: if hasattr(self, "_name"): name_str = str(self._name).lower() self._is_datetime_line = "datetime" in name_str else: class_str = str(self.__class__.__name__).lower() self._is_datetime_line = "datetime" in class_str except Exception: self._is_datetime_line = False if self._is_datetime_line: self._default_value = 1.0 elif self._is_indicator: self._default_value = float("nan") else: self._default_value = 0.0 # Set the value of _idx
[docs] def set_idx(self, idx, force=False): """Set the index position. Args: idx: The new index value. force: If True, force set even in QBuffer mode at lenmark. Note: In QBuffer mode, when at lenmark, the index stays at 0 unless force is True. This allows resampling operations. """ # If QBuffer and the last position of the buffer were reached, keep # it (unless force) as index 0. This allows resampling # - forward adds a position. However, the 1st one is discarded, the 0 is # invariant # force supports replaying, which needs the extra bar to float # forward/backwards, because the last input is read, and after a # "backwards" is used to update the previous data. Unless position # 0 was moved to the previous index, it would fail # Optimization A: Removed all hasattr checks, __init__ ensures all attributes exist if self.mode == self.QBuffer: if force or self._idx < self.lenmark: self._idx = idx else: # default: UnBounded self._idx = idx
# Property usage: can be used to get and set idx idx = property(get_idx, set_idx) # Reset
[docs] def reset(self): """Resets the internal buffer structure and the indices""" # CRITICAL FIX: In runonce mode, if array is already populated (from _once()), # preserve the array and lencount, only reset idx # Check if we're in runonce mode and array is populated preserve_array = False try: # Check if this is an indicator line that was processed in runonce mode # Line's _owner might be a Lines object, which has _owner pointing to the indicator if hasattr(self, "_owner") and self._owner is not None: owner = self._owner # Check if owner is a Lines object (which wraps lines for indicators) # Lines objects have _owner pointing to the actual indicator if hasattr(owner, "_owner") and owner._owner is not None: indicator = owner._owner # Check if indicator was processed in runonce mode if hasattr(indicator, "_once_called") and indicator._once_called: # Check if array has data if hasattr(self, "array") and self.array is not None: array_len = len(self.array) if array_len > 0: preserve_array = True # Also check if owner itself is an indicator elif hasattr(owner, "_once_called") and owner._once_called: # Check if array has data if hasattr(self, "array") and self.array is not None: array_len = len(self.array) if array_len > 0: preserve_array = True except Exception as e: logger.debug("Failed to check runonce array preservation: %s", e) if preserve_array: # In runonce mode with populated arrays, preserve the precomputed # values but restart logical length so Cerebro's event replay can # advance indicators according to their own clocks. self.idx = -1 if hasattr(self, "lencount"): self.lencount = 0 self.extension = 0 else: # Normal reset: clear array and reset all counters # Optimization A: Removed hasattr checks, all attributes initialized in __init__ # If in cache mode (QBuffer), use deque to store data with fixed size if self.mode == self.QBuffer: # Add extrasize to ensure resample/replay work deque_maxlen = max(1, self.maxlen + self.extrasize) self.array = collections.deque(maxlen=deque_maxlen) self.useislice = True else: # Non-cache mode, use array.array self.array = array.array("d") self.useislice = False # CRITICAL FIX: Do NOT pre-fill array - this causes buflen() to be incorrect # buflen() = len(array) - extension, so pre-filling increases buflen incorrectly # Instead, let forward() handle array growth naturally # Reset counters and indices self.lencount = 0 self.idx = -1 self.extension = 0
# Set cache-related variables
[docs] def qbuffer(self, savemem=0, extrasize=0): """Enable queued buffer mode for memory-efficient storage. Args: savemem: Memory saving mode (0=normal, >0=enable cache mode). extrasize: Extra buffer size for resampling/replay operations. Note: In QBuffer mode, only the last maxlen values are kept, reducing memory usage for long backtests. """ self.mode = self.QBuffer # Set specific mode self.maxlen = max(1, self._minperiod) # Set maximum length, ensure at least 1 self.extrasize = max(0, extrasize) # Set extra size, ensure non-negative self.lenmark = self.maxlen - (not self.extrasize) # Max length minus 1 if extrasize=0 self.reset() # Reset
# Get indicator values
[docs] def getindicators(self): """Get list of indicators using this line buffer. Returns: list: Empty list for base LineBuffer (override in subclasses). """ return []
# Minimum buffer
[docs] def minbuffer(self, size): """The linebuffer must guarantee the minimum requested size to be available. In non-dqbuffer mode, this is always true (of course, until data is filled at the beginning, there are fewer values, but minperiod in the framework should account for this. In dqbuffer mode, the buffer has to be adjusted for this if currently less than requested """ # If not in cache mode or max length is already >= size, return None if self.mode != self.QBuffer or self.maxlen >= size: return # In cache mode, set maxlen equal to size self.maxlen = size # Max length minus 1 if self.extrasize=0 self.lenmark = self.maxlen - (not self.extrasize) # Reset self.reset()
# Return actual length
[docs] def __len__(self): """ Return the linebuffer's length counter. Performance optimization: Restore master branch's simple implementation - Directly return self.lencount (pre-calculated length value) - Remove all recursion checks, hasattr calls and complex logic - Performance improvement: from 0.611s to ~0.05s (92% improvement) """ return self.lencount
# Return the length of data in the line cache
[docs] def buflen(self): """Real data that can be currently held in the internal buffer The internal buffer can be longer than the actual stored data to allow for "lookahead" operations. The real amount of data that is held/can be held in the buffer is returned """ return len(self.array) - self.extension
[docs] def __getitem__(self, ago): """ Get the value at a specified offset - optimized for hot path. Args: ago (int): Relative offset from current index (0=current, -1=previous, 1=next) Returns: Value at the specified position """ # PERFORMANCE OPTIMIZATION: Fast path for common case (ago <= 0) # Avoid __dict__ access for majority of calls try: current_idx = self._idx lencount = self.lencount if lencount > 0 and current_idx >= lencount: current_idx = lencount - 1 value = self.array[current_idx + ago] if isinstance(value, float) and not math.isfinite(value) and value == value: return 0.0 return value except IndexError: # Index out of buffer range; fall through to the slow-path handling. pass # Slow path: handle special cases # CRITICAL FIX: For data feed lines accessing FUTURE data is_data_feed_line = getattr(self, "_is_data_feed_line", False) if is_data_feed_line and ago > 0: target_idx = self._idx + ago if target_idx >= len(self.array) or self.array[target_idx] == 0.0: raise IndexError("array index out of range") # Check the simple flag for data feed line if is_data_feed_line: raise IndexError("array index out of range") # For indicators and other cases, return appropriate default if getattr(self, "_is_indicator", False): return float("nan") return 0.0
# Get data values, widely used in strategies
[docs] def get(self, ago=0, size=1): """Returns a slice of the array relative to *ago* Keyword Args: ago (int): Point of the array to which size will be added to return the slice size(int): size of the slice to return, can be positive or negative If size is positive *ago* will mark the end of the iterable and vice versa if size is negative Returns: A slice of the underlying buffer """ # Whether to use islice, use following syntax if true if self.useislice: start = self._idx + ago - size + 1 end = self._idx + ago + 1 values = list(islice(self.array, start, end)) else: # If not using islice, directly slice the array values = list(self.array[self._idx + ago - size + 1 : self._idx + ago + 1]) return array.array( "d", ( ( 0.0 if isinstance(value, float) and not math.isfinite(value) and value == value else value ) for value in values ), )
# Return the value at the actual index 0 of the array
[docs] def getzeroval(self, idx=0): """Returns a single value of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return Returns: A slice of the underlying buffer """ value = self.array[idx] if isinstance(value, float) and not math.isfinite(value) and value == value: return 0.0 return value
# Return data of size starting from idx in the array
[docs] def getzero(self, idx=0, size=1): """Returns a slice of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return Returns: A slice of the underlying buffer """ if self.useislice: values = list(islice(self.array, idx, idx + size)) else: values = list(self.array[idx : idx + size]) return array.array( "d", ( ( 0.0 if isinstance(value, float) and not math.isfinite(value) and value == value else value ) for value in values ), )
# Set values to the array
[docs] def __setitem__(self, ago, value): """Sets a value at position "ago" and executes any associated bindings Keyword Args: ago (int): Point of the array to which size will be added to return the slice value (variable): value to be set Performance optimization: Use pre-calculated flags to avoid repeated hasattr and string operations """ # Performance optimization: Use try-except instead of hasattr to check array existence # array is already initialized in __init__, this is just a defensive check try: array = self.array except AttributeError: import array as array_module array = array_module.array("d") self.array = array # Performance optimization: Use pre-calculated flags and default values # Handle None/NaN values - use fast path for checking if value is None: value = self._default_value # PERFORMANCE OPTIMIZATION: Use value != value for NaN check. # Preserve explicit NaN writes for non-datetime lines. Data feeds often # use NaN as a sparse signal sentinel; converting it to 0.0 turns # "no signal" into a finite tradable value. elif value != value: # NaN detection without isinstance + isnan value = self._default_value if self._is_datetime_line else float("nan") elif isinstance(value, float) and not math.isfinite(value): value = self._default_value # datetime line value validation elif self._is_datetime_line and value < 1.0: value = 1.0 elif self._is_datetime_line: # For non-numeric datetime line values, convert to 1.0 try: float_value = float(value) value = 1.0 if float_value < 1.0 else float_value except (TypeError, ValueError): value = 1.0 # Calculate the required index required_index = self._idx + ago # Handle index out of bounds - fast path array_len = len(array) if required_index >= array_len: # Performance optimization: Use pre-calculated default value as fill value fill_value = self._default_value extend_size = required_index - array_len + 1 # Batch extend the array for _ in range(extend_size): array.append(fill_value) elif required_index < 0: # Skip setting values for negative indices return # Set the value at the required index array[required_index] = value # Update any bindings - only execute if bindings exist # Performance optimization: bindings are empty in most cases, check before processing if self.bindings: for binding in self.bindings: # Performance optimization: Use try-except to get binding's datetime flag # Most bindings are not datetime lines, fast path try: binding_is_datetime = binding._is_datetime_line except AttributeError: # Binding doesn't have pre-calculated flag, fall back to simple check binding_is_datetime = False binding_value = value if binding_is_datetime and ( not isinstance(binding_value, (int, float)) or binding_value < 1.0 ): binding_value = 1.0 binding[ago] = binding_value
# Set specific value to array
[docs] def set(self, value, ago=0): """Sets a value at position "ago" and executes any associated bindings Keyword Args: value (variable): value to be set ago (int): Point of the array to which size will be added to return the slice PERF: Uses pre-calculated _is_datetime_line and _default_value flags instead of hasattr/isinstance checks on every call. """ # PERF: Use pre-calculated flag instead of hasattr + string ops is_dt = self._is_datetime_line # Handle None/NaN values using fast detection if value is None or value != value or isinstance(value, float) and not math.isfinite(value): value = self._default_value elif is_dt and (not isinstance(value, (int, float)) or value < 1.0): value = 1.0 # Array is always initialized in __init__, use direct access arr = self.array required_index = self._idx + ago arr_len = len(arr) if required_index >= arr_len: fill_value = self._default_value for _ in range(required_index - arr_len + 1): arr.append(fill_value) elif required_index < 0: return arr[required_index] = value if self.bindings: for binding in self.bindings: try: b_is_dt = binding._is_datetime_line except AttributeError: b_is_dt = False b_val = value if b_is_dt and (not isinstance(b_val, (int, float)) or b_val < 1.0): b_val = 1.0 binding[ago] = b_val
# Return to the beginning
[docs] def home(self): """Rewinds the logical index to the beginning The underlying buffer remains untouched and the actual len can be found out with buflen """ self.idx = -1 self.lencount = 0
# Move forward one step
[docs] def forward(self, value=NAN, size=1): """Moves the logical index forward and enlarges the buffer as much as needed Keyword Args: value (variable): value to be set in new positions size (int): How many extra positions to enlarge the buffer """ # PERFORMANCE OPTIMIZATION: Direct attribute access (faster than __dict__.get) # Attributes are guaranteed to exist after __init__ is_indicator = self._is_indicator # PERFORMANCE OPTIMIZATION: Use value != value for NaN check # NaN is the only value that's not equal to itself if value is None or value != value or isinstance(value, float) and not math.isfinite(value): value = self._default_value # For non-indicators, follow clock synchronization if not is_indicator: clock = self._clock if clock is not None: try: clock_len = len(clock) current_len = self.lencount if current_len >= clock_len: return max_advance = clock_len - current_len if size > max_advance: size = max_advance if size <= 0: return except Exception as e: logger.debug("Failed to check clock length in forward: %s", e) # CRITICAL FIX: Ensure we have a valid size if size <= 0: return if self.mode == self.QBuffer: self.idx = self._idx + size else: self._idx += size self.lencount += size append_val = value if size == 1: self.array.append(append_val) else: # Batch extend for multiple positions self.array.extend([append_val] * size)
# Move backward one step
[docs] def backwards(self, size=1, force=False): """Moves the logical index backwards and reduces the buffer as much as needed Keyword Args: size (int): How many extra positions to rewind the buffer force (bool): Whether to force the reduction of the logical buffer regardless of the minperiod """ # CRITICAL FIX: Match master behavior - use set_idx for force support and pop array elements new_idx = self._idx - size if self.mode == self.QBuffer: self.set_idx(new_idx, force=force) else: self._idx = new_idx self.lencount -= size # PERFORMANCE OPTIMIZATION: Use slice deletion instead of loop pop # Called 3.4M+ times, batch deletion is faster than loop arr = self.array arr_len = len(arr) if arr_len > 0: remove_count = min(size, arr_len) try: del arr[arr_len - remove_count :] except TypeError: # qbuffer/exactbars uses deque, which does not support slice # deletion. Remove newest values explicitly in that mode. for _ in range(remove_count): arr.pop()
# Move backward one step (original backwards was overridden)
[docs] def safe_backwards(self, size=1): """Safely move the index backwards without raising errors. Args: size: Number of positions to move backwards. Returns: bool: True if index is still >= 0 after moving, False otherwise. """ # PERF: _idx is always initialized in __init__, skip hasattr idx = self._idx if idx is None: self._idx = -1 return False self._idx = idx - size return self._idx >= 0
# Decrease idx and lencount by size
[docs] def rewind(self, size=1): """Rewind the buffer by decreasing idx and lencount. Args: size: Number of positions to rewind. """ # PERF: idx and lencount are always initialized in __init__ if self.mode == self.QBuffer: self.idx = self._idx - size else: self._idx -= size self.lencount -= size
# Increase idx and lencount by size
[docs] def advance(self, size=1): """Advances the logical index without touching the underlying buffer""" # CRITICAL FIX: Remove hasattr checks - attributes are always initialized in __init__ # The hasattr checks were preventing proper advancement if self.mode == self.QBuffer: self.idx = self._idx + size else: self._idx += size self.lencount += size
# Extend forward
[docs] def extend(self, value=float("nan"), size=0): """Extends the underlying array with positions that the index will not reach Keyword Args: value (variable): value to be set in new positins size (int): How many extra positions to enlarge the buffer The purpose is to allow for lookahead operations or to be able to set values in the buffer "future" """ if value is None or value != value or isinstance(value, float) and not math.isfinite(value): value = self._default_value self.extension += size for i in range(size): self.array.append(value)
# Add another LineBuffer
[docs] def addbinding(self, binding): """Adds another line binding Keyword Args: binding (LineBuffer): another line that must be set when this line becomes a value """ self.bindings.append(binding) # record in the binding when the period is starting (never sooner # than self) binding.updateminperiod(self._minperiod)
# Get all data starting from idx
[docs] def plot(self, idx=0, size=None): """Returns a slice of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return This is a variant of getzero that unless told otherwise returns the entire buffer, which is usually the idea behind plottint (all must be plotted) Returns: A slice of the underlying buffer """ return self.getzero(idx, size or len(self))
# Get partial data from array
[docs] def plotrange(self, start, end): """Get a slice of data from the array. Args: start: Start index of the slice. end: End index of the slice. Returns: list or array: Slice of data from start to end. """ if self.useislice: values = list(islice(self.array, start, end)) else: values = list(self.array[start:end]) return [ ( 0.0 if isinstance(value, float) and not math.isfinite(value) and value == value else value ) for value in values ]
# Set array values for each binding when running in once mode
[docs] def oncebinding(self): """ Executes the bindings when running in "once" mode """ larray = self.array blen = self.buflen() for binding in self.bindings: binding.array[0:blen] = larray[0:blen]
# Convert binding to line
[docs] def bind2lines(self, binding=0): """ Stores a binding to another line. "Binding" can be an index or a name """ if isinstance(binding, string_types): line = getattr(self._owner.lines, binding) else: line = self._owner.lines[binding] self.addbinding(line) return self
bind2line = bind2lines
[docs] def __call__(self, ago=None): """Returns either the current value (ago=None) or a delayed LineBuffer that fetches the value which is "ago" periods before. Useful to have the closing price 5 bars before: close(-5) """ if ago is None: return self[0] return LineDelay(self, ago)
def _makeoperation(self, other, operation, r=False, _ownerskip=None, original_other=None): # Only set parent_a/parent_b to LineActions instances (LinesOperation, _LineDelay, etc.). # Full indicators (ATR, SMA, SuperTrend, etc.) are processed separately by _lineiterators # ordering in _once(), so they must never be called via _parent_a.once() which would # trigger premature once_via_next() calls that corrupt the data feed index state. parent_a = None if hasattr(self, "_owner") and self._owner is not None: owner = self._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: ref = owner._owner_ref if isinstance(ref, LineActions): parent_a = ref elif isinstance(owner, LineActions): parent_a = owner parent_b_candidate = original_other if original_other is not None else other parent_b = parent_b_candidate if isinstance(parent_b_candidate, LineActions) else None return LinesOperation(self, other, operation, r=r, parent_a=parent_a, parent_b=parent_b) def _makeoperationown(self, operation, _ownerskip=None): parent_a = None if hasattr(self, "_owner") and self._owner is not None: owner = self._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: ref = owner._owner_ref if isinstance(ref, LineActions): parent_a = ref elif isinstance(owner, LineActions): parent_a = owner return LineOwnOperation(self, operation, parent_a=parent_a) def _settz(self, tz): self._tz = tz
[docs] def datetime(self, ago=0, tz=None, naive=True): """Get the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive datetime without timezone info. Returns: datetime: Datetime object representing the timestamp. Raises: IndexError: If the requested position is out of bounds for data feeds. """ # PERFORMANCE OPTIMIZATION: Simplified datetime() method # - Use module-level _DEFAULT_DATETIME constant # - Remove redundant import statements # - Reduce nested try-except blocks # Get value, may raise IndexError for data feeds value = self[ago] # Fast path: Check for NaN/None values if _is_nan_or_none(value): return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz) # Fast path: Common case (ago=0, tz=None, naive=True) with caching if ago == 0 and tz is None and naive: current_idx = self._idx if self.lencount > 0 and current_idx >= self.lencount: current_idx = self.lencount - 1 # Check cache if ( getattr(self, "_dt_cache_idx", None) == current_idx and getattr(self, "_dt_cache_tz", None) is self._tz and getattr(self, "_dt_cache_value", None) == value ): return self._dt_cache_dt # Convert and cache try: dt = num2date(value, self._tz, True) self._dt_cache_idx = current_idx self._dt_cache_tz = self._tz self._dt_cache_value = value self._dt_cache_dt = dt return dt except (ValueError, OverflowError): return _DEFAULT_DATETIME # Slow path: non-default parameters try: return num2date(value, tz or self._tz, naive) except (ValueError, OverflowError): return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz)
[docs] def date(self, ago=0, tz=None, naive=True): """Get the date component of the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive date without timezone info. Returns: date: Date object representing the date portion of the timestamp. Raises: IndexError: If the requested position is out of bounds for data feeds. """ # CRITICAL FIX: date() calls datetime(), which should raise IndexError if out of range # This allows strategy to detect end of data for next_month calculation try: dt = self.datetime(ago, tz, naive) except IndexError: # Re-raise IndexError to allow strategy to detect end of data raise if dt is None: return None try: return dt.date() except (AttributeError, ValueError): return None
[docs] def time(self, ago=0, tz=None, naive=True): """Get the time component of the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive time without timezone info. Returns: time: Time object representing the time portion of the timestamp. """ dt = self.datetime(ago, tz, naive) if dt is None: return None try: return dt.time() except (AttributeError, ValueError): return None
[docs] def dt(self, ago=0): """Alias to avoid the extra chars in "datetime" for this field""" return self.datetime(ago)
[docs] def tm_raw(self, ago=0): """ Returns a localtime/gmtime like time.struct_time object which is compatible with strftime formatting. The time zone of the struct_time is naive """ return self.datetime(ago, naive=False).timetuple()
[docs] def tm(self, ago=0): """ Returns a localtime/gmtime like time.struct_time object which is compatible with strftime formatting. The time zone of the struct_time is naive """ return self.datetime(ago, naive=True).timetuple()
[docs] def tm_lt(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is lower than the time carried by the "other" line """ return self[ago] < other[0]
[docs] def tm_le(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is lower than or equal to the time carried by the "other" line """ return self[ago] <= other[0]
[docs] def tm_eq(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is equal to the time carried by the "other" line """ return self[ago] == other[0]
[docs] def tm_gt(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is greater than the time carried by the "other" line """ return self[ago] > other[0]
[docs] def tm_ge(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is greater than or equal to the time carried by the "other" line """ return self[ago] >= other[0]
[docs] def tm2dtime(self, tm, ago=0): """ Returns the passed tm (time.struct_time) in a datetime using the timezone (if any) of the line """ return datetime.datetime(*tm[:6])
[docs] def tm2datetime(self, tm, ago=0): """ Returns the passed tm (time.struct_time) in a datetime using the timezone (if any) of the line """ return datetime.datetime(*tm[:6])
# LineActions cache for performance
[docs] class LineActionsCache: """Cache system for LineActions to avoid repetitive calculations""" _cache: dict = {} _cache_enabled = False
[docs] @classmethod def enable_cache(cls, enable=True): """Enable or disable the cache. Args: enable: True to enable caching, False to disable. """ cls._cache_enabled = enable
[docs] @classmethod def clear_cache(cls): """Clear all cached values.""" cls._cache.clear()
[docs] @classmethod def get_cache_key(cls, *args): """Generate cache key from arguments""" return hash(tuple(id(arg) if hasattr(arg, "__hash__") else str(arg) for arg in args))
[docs] class LineActionsMixin: """Mixin to provide LineActions functionality without metaclass"""
[docs] @classmethod def dopreinit(cls, _obj, *args, **kwargs): """Pre-initialization processing for LineActions""" # CRITICAL FIX: Set lines._owner BEFORE any user __init__ code runs # This is needed for line bindings like: self.lines.crossover = upcross - downcross if hasattr(_obj, "lines") and _obj.lines is not None: if not hasattr(_obj.lines, "_owner") or _obj.lines._owner is None: _obj.lines._owner = _obj # Set up clock from explicit line arguments first, matching the # original LineActions metaclass semantics. This is critical for # operations built on secondary data feeds: the operation must follow # the line it was created from, not the strategy's primary data clock. _obj._clock = None _obj._datas = [arg for arg in args if isinstance(arg, LineRoot)] if _obj._datas: data_clock = getattr(_obj._datas[0], "_clock", None) if data_clock is not None and data_clock.__class__.__name__ != "MinimalClock": _obj._clock = data_clock else: _obj._clock = _obj._datas[0] if _obj._clock is None and hasattr(_obj, "_owner") and _obj._owner is not None: # Try to get clock from owner first if hasattr(_obj._owner, "_clock") and _obj._owner._clock is not None: _obj._clock = _obj._owner._clock # If owner has datas, use the first data as clock elif hasattr(_obj._owner, "datas") and _obj._owner.datas: _obj._clock = _obj._owner.datas[0] # If owner has data attribute, use it as clock elif hasattr(_obj._owner, "data") and _obj._owner.data is not None: _obj._clock = _obj._owner.data # Try the owner itself as clock if it has __len__ elif hasattr(_obj._owner, "__len__"): _obj._clock = _obj._owner # If still no clock found and we have datas, use the first data if _obj._clock is None and hasattr(_obj, "datas") and _obj.datas: _obj._clock = _obj.datas[0] # CRITICAL FIX: Only initialize minperiod to 1 if not already set from data sources # The _minperiod might have been set in __new__ from data sources for nested indicators # (e.g., EMA applied to another indicator's output) if not hasattr(_obj, "_minperiod") or _obj._minperiod is None: _obj._minperiod = 1 # CRITICAL FIX: Calculate minperiod from args (like original metaclass did) # This ensures that indicators applied to other indicators inherit their minperiod from .lineroot import LineMultiple, LineSingle _minperiods = [] # Collect minperiods from LineSingle args for arg in args: if isinstance(arg, LineSingle): _minperiods.append(getattr(arg, "_minperiod", 1)) # Collect minperiods from LineMultiple args (get their first line) for arg in args: if isinstance(arg, LineMultiple) and hasattr(arg, "lines") and arg.lines: try: first_line = arg.lines[0] _minperiods.append(getattr(first_line, "_minperiod", 1)) except (IndexError, TypeError): # Empty/non-indexable lines container; skip this arg. pass # Update minperiod with max from args if _minperiods: _minperiod = max(_minperiods) _obj.updateminperiod(_minperiod) return _obj, args, kwargs
[docs] @classmethod def dopostinit(cls, _obj, *args, **kwargs): """Post-initialization processing for LineActions"""
# NOTE: Indicator registration is now handled in lineiterator.py dopostinit # with proper duplicate checking. No registration needed here.
[docs] class PseudoArray: """Wrapper for non-array iterables to provide array-like access. This class wraps iterables (including itertools.repeat) and provides array-like indexing access. It handles cases where the wrapped object doesn't support direct indexing. Attributes: wrapped: The wrapped iterable object. _minperiod: Minimum period inherited from the wrapped object. Example: >>> from itertools import repeat >>> pseudo = PseudoArray(repeat(1.0)) >>> print(pseudo[0]) 1.0 """
[docs] def __init__(self, wrapped): """Initialize PseudoArray with a wrapped iterable. Args: wrapped: The iterable object to wrap. """ self.wrapped = wrapped # CRITICAL FIX: Ensure PseudoArray has _minperiod attribute self._minperiod = getattr(wrapped, "_minperiod", 1)
def __getitem__(self, key): try: # Try normal indexing first return self.wrapped[key] except (TypeError, IndexError, AttributeError): # Handle itertools.repeat objects and other iterables that don't support indexing if hasattr(self.wrapped, "__iter__"): # For repeat objects, all values are the same, so just get the first one try: # Convert to list if it's a repeat object if str(type(self.wrapped)) == "<class 'itertools.repeat'>": # For repeat, all values are the same return next(iter(self.wrapped)) # Convert iterable to list and index wrapped_list = list(self.wrapped) return wrapped_list[key] except (StopIteration, IndexError): return float("nan") else: # If not iterable, return the wrapped object itself for index 0 if key == 0: return self.wrapped return float("nan") @property def array(self): """Get the array representation of the wrapped object. Returns: list or array: Array representation of the wrapped object. """ # Handle repeat objects specially if str(type(self.wrapped)) == "<class 'itertools.repeat'>": # For repeat objects, return a list with one element repeated return [next(iter(self.wrapped))] if hasattr(self.wrapped, "array"): return self.wrapped.array if not hasattr(self.wrapped, "__iter__"): return [] return self.wrapped
[docs] class LineActions(LineBuffer, LineActionsMixin, metabase.ParamsMixin): """ Base class for *Line Clases* with different lines, derived from a LineBuffer """ _ltype = LineRoot.IndType # Add plotlines attribute for plotting support plotlines = object()
[docs] def __new__(cls, *args, **kwargs): """Handle data processing for indicators and other LineActions objects""" # Create the instance using the normal Python object creation instance = super().__new__(cls) # Initialize basic attributes import collections instance._lineiterators = collections.defaultdict(list) instance._lineaction_init_args = args # CRITICAL FIX: Define mindatas before using it mindatas = getattr(cls, "_mindatas", getattr(cls, "mindatas", 1)) # Set up parameters for this instance (needed for self.p.period etc.) if hasattr(cls, "_params") and cls._params is not None: params_cls = cls._params # Create parameter instance for this object instance.p = params_cls() # Update with kwargs for key, value in kwargs.items(): if hasattr(instance.p, key): setattr(instance.p, key, value) else: # Fallback to empty parameter object from .utils import DotDict instance.p = DotDict(**kwargs) # Create and set up Lines instance lines_cls = getattr(cls, "lines", None) if lines_cls is not None: instance.lines = lines_cls() # CRITICAL FIX: Set lines._owner immediately after creating lines instance # Use object.__setattr__ to directly set _owner_ref (bypasses Lines.__setattr__) object.__setattr__(instance.lines, "_owner_ref", instance) # Ensure lines are properly initialized with their own buffers if hasattr(instance.lines, "_obj"): instance.lines._obj = instance # CRITICAL FIX: Ensure lines instance has the essential methods # If the lines instance doesn't have advance method, add it if not hasattr(instance.lines, "advance"): def advance_method(size=1): """Forward all lines in the collection""" for line in getattr(instance.lines, "lines", []): if hasattr(line, "advance"): line.advance(size=size) instance.lines.advance = advance_method # CRITICAL FIX: Set up line references for indicators # Each line should be a separate LineBuffer with its own array if hasattr(instance.lines, "lines") and instance.lines.lines: # Ensure each line is a LineBuffer with its own array for i, line_obj in enumerate(instance.lines.lines): if not isinstance(line_obj, LineBuffer): # Create a new LineBuffer for this line - no import needed, we're in linebuffer.py new_line = LineBuffer() # Copy any existing attributes if hasattr(line_obj, "__dict__"): new_line.__dict__.update(line_obj.__dict__) instance.lines.lines[i] = new_line line_obj = new_line # Ensure the line has its own array if not hasattr(line_obj, "array") or not line_obj.array: import array line_obj.array = array.array("d") line_obj._idx = -1 line_obj.lencount = 0 line_obj._refresh_cached_line_flags( owner=instance.lines, ltype=getattr(instance, "_ltype", cls._ltype), ) # Set up convenience references - first line as .line instance.line = instance.lines.lines[0] if instance.lines.lines else instance instance.l = instance.lines # Common shorthand else: # No individual lines, use the instance itself instance.line = instance instance.l = instance.lines else: # Create default lines using the proper Lines class from .lineseries import Lines instance.lines = Lines() # Add the advance method if it doesn't exist if not hasattr(instance.lines, "advance"): def advance_method(size=1): """Forward all lines in the collection""" for line in getattr(instance.lines, "lines", []): if hasattr(line, "advance"): line.advance(size=size) instance.lines.advance = advance_method instance.line = instance instance.l = instance.lines # CRITICAL FIX: Auto-assign data from owner if no data provided and mindatas > 0 if mindatas > 0: # Try to get owner and auto-assign data using multiple strategies from . import metabase owner = None # Strategy 1: Use nearest LineIterator owner. Falling back directly # to Strategy can skip an enclosing indicator and bind expression # clocks to the primary strategy data. try: from .lineiterator import LineIterator except ImportError: LineIterator = None if LineIterator is not None: owner = metabase.findowner(instance, LineIterator) # Strategy 2: Use findowner for Strategy try: from .strategy import Strategy except ImportError: Strategy = None if owner is None and Strategy is not None: owner = metabase.findowner(instance, Strategy) # If we found an owner with data, auto-assign it if owner is not None and hasattr(owner, "data") and owner.data is not None: # Check if we already have data in args data_count = 0 for arg in args: if ( isinstance(arg, LineRoot) or hasattr(arg, "lines") or hasattr(arg, "_name") or str(type(arg).__name__).endswith("Data") ): data_count += 1 # If we need more data sources than we have, auto-assign from owner if data_count < mindatas: # Add owner's data as needed missing_data_count = mindatas - data_count for _ in range(missing_data_count): args = (owner.data,) + args # Process arguments to identify data sources data_count = 0 processed_datas = [] for i, arg in enumerate(args): if ( isinstance(arg, LineRoot) or hasattr(arg, "lines") or hasattr(arg, "_name") or str(type(arg).__name__).endswith("Data") ): processed_datas.append(arg) data_count += 1 if data_count >= mindatas: break instance.datas = processed_datas if processed_datas: instance.data = processed_datas[0] else: instance.data = None # Set up dnames if available try: from .utils import DotDict instance.dnames = DotDict( [(d._name, d) for d in instance.datas if getattr(d, "_name", "")] ) except Exception: instance.dnames = {} return instance
def __init__(self, *args, **kwargs): """Initialize LineActions instance. Sets up lines, owner references, data sources, and clock. This is a complex initialization that handles multiple scenarios including indicators, strategies, and data feeds. Args: *args: Positional arguments including data feeds. **kwargs: Keyword arguments for parameters. """ # CRITICAL FIX: Set lines._owner FIRST, before any other initialization # This ensures line bindings in user's __init__ can find the owner if hasattr(self, "lines") and self.lines is not None: # If lines is still a class, create an instance first if isinstance(self.lines, type): self.lines = self.lines() # Now set owner using object.__setattr__ to directly set _owner_ref if self.lines is not None: object.__setattr__(self.lines, "_owner_ref", self) # Set up _owner from call stack BEFORE calling dopreinit from . import metabase # Try to find any LineIterator-like owner # Try findowner first with different classes self._owner = None # First try to find the nearest LineIterator. For LineActions created # inside an indicator this keeps ownership/clock fallback local to that # indicator instead of jumping to the enclosing strategy. try: from .lineiterator import LineIterator self._owner = metabase.findowner(self, LineIterator) except Exception as e: logger.debug("Failed to find LineIterator owner: %s", e) # If no LineIterator found, try Strategy specifically try: from .strategy import Strategy if self._owner is None: self._owner = metabase.findowner(self, Strategy) except Exception as e: logger.debug("Failed to find Strategy owner: %s", e) # If still no owner, try a broader search # findowner() uses OwnerContext for owner lookup if self._owner is None: self._owner = metabase.findowner(self, None) init_args = args or getattr(self, "_lineaction_init_args", ()) # Call pre-init self.__class__.dopreinit(self, *init_args, **kwargs) # Call parent init super().__init__() # LineBuffer.__init__ initializes low-level buffer fields and resets # _clock/_owner metadata. Re-apply the LineActions pre-init metadata # afterwards so explicit line operands keep their own data clock. self.__class__.dopreinit(self, *init_args, **kwargs) self._refresh_cached_line_flags( owner=getattr(self, "_owner", None), ltype=getattr(self.__class__, "_ltype", LineRoot.IndType), ) if hasattr(self, "lines") and hasattr(self.lines, "lines"): for line_obj in self.lines.lines: if hasattr(line_obj, "_refresh_cached_line_flags"): line_obj._refresh_cached_line_flags( owner=self.lines, ltype=getattr(self, "_ltype", LineRoot.IndType), ) # Call post-init self.__class__.dopostinit(self, *args, **kwargs)
[docs] def getindicators(self): """Get list of indicators using this line actions object. Returns: list: Empty list for base LineActions (override in subclasses). """ return []
[docs] def qbuffer(self, savemem=0): """Enable queued buffer mode for memory-efficient storage. Args: savemem: Memory saving mode (0=normal, >0=enable cache mode). """ super().qbuffer(savemem=1)
[docs] def plotlabel(self): """Return the plot label for this line object""" # Try to get plot label from _plotlabel method if hasattr(self, "_plotlabel"): label_dict = self._plotlabel() # Convert dict to string format if isinstance(label_dict, dict): # Format as 'ClassName(param1=value1, param2=value2)' params_str = ", ".join(f"{k}={v}" for k, v in label_dict.items()) if params_str: return f"{self.__class__.__name__}({params_str})" return self.__class__.__name__ return str(label_dict) # Fallback: return class name return self.__class__.__name__
def _plotlabel(self): """Default implementation of plot label""" # Try to get params if available if hasattr(self, "params") and hasattr(self.params, "_getkwargs"): return self.params._getkwargs() # Otherwise return empty dict return {}
[docs] @staticmethod def arrayize(obj): """Convert an object to an array-compatible object. Args: obj: Object to convert. Can be a value, iterable, or array-like. Returns: The original object if it has an array attribute, otherwise a LineNum or PseudoArray wrapper. """ if not hasattr(obj, "array"): if not hasattr(obj, "__getitem__"): # CRITICAL FIX: Create a LineNum that properly handles _minperiod line_num = LineNum(obj) # Ensure the LineNum has the _minperiod attribute if not hasattr(line_num, "_minperiod"): line_num._minperiod = 1 return line_num # make it a LineNum if not hasattr(obj, "__len__"): pseudo_array = PseudoArray(obj) # CRITICAL FIX: Ensure PseudoArray objects have _minperiod for compatibility if not hasattr(pseudo_array, "_minperiod"): pseudo_array._minperiod = 1 return pseudo_array # Can iterate (for once) return obj
def _next_old(self): """DEPRECATED: This method is no longer used. LineIterator._next() is used instead.""" # CRITICAL FIX: Prevent double processing if _once was already called if hasattr(self, "_once_called") and self._once_called: return # Already processed in once mode, don't process again # CRITICAL FIX: Ensure data synchronization without over-advancing if hasattr(self, "_clock") and self._clock is not None: try: clock_len = len(self._clock) self_len = len(self) # Only advance if we're behind the clock and not already at or ahead if self_len < clock_len and (clock_len - self_len) <= 1: # Forward one step to match the clock self.forward() except Exception: # If clock access fails, just forward once self.forward() else: # No clock, just forward once self.forward() # Call prenext or nextstart/next depending on minperiod if len(self) < self._minperiod: self.prenext() elif len(self) == self._minperiod: self.nextstart() # called once for the 1st value over minperiod else: self.next() # called for each value over minperiod def _once(self, start, end): # Mark that once was called to prevent double processing in _next self._once_called = True # CRITICAL FIX: Ensure array exists but don't pre-fill it # Pre-filling causes incorrect buflen() calculations if not hasattr(self, "array") or self.array is None: import array as array_module self.array = array_module.array("d") # CRITICAL FIX: Ensure proper range for once processing if start < 0: start = 0 if end < start: end = start # CRITICAL FIX: Get the actual buffer length if available # Skip this check if _clock is MinimalClock (always returns 0) if hasattr(self, "_clock") and self._clock and hasattr(self._clock, "buflen"): clock_class_name = getattr(self._clock, "__class__", type(None)).__name__ if "MinimalClock" not in clock_class_name: max_len = self._clock.buflen() if max_len > 0 and end > max_len: end = max_len # CRITICAL FIX: Call _once() on all child line iterators first # This ensures dependencies are calculated before this indicator if hasattr(self, "_lineiterators"): from .lineiterator import LineIterator for indicator in self._lineiterators.get(LineIterator.IndType, []): try: if hasattr(indicator, "_once"): indicator._once(start, end) except Exception as e: logger.debug("Indicator _once failed: %s", e) # CRITICAL FIX: Call preonce before main processing try: if hasattr(self, "preonce"): self.preonce(start, end) except Exception as e: logger.debug("preonce failed: %s", e) # CRITICAL FIX: Ensure operand arrays are computed before once() # For Logic subclasses (bt.If, bt.And, etc.), operands (args, cond) need # their arrays populated before once() reads from them. if hasattr(self, "args"): for arg in self.args: if hasattr(arg, "once") and hasattr(arg, "array") and len(arg.array) < end: try: arg.once(0, end) except Exception: # nosec B110 # Operand may already be computed or not once()-able; continue. pass if hasattr(self, "cond"): cond = self.cond if hasattr(cond, "once") and hasattr(cond, "array") and len(cond.array) < end: try: cond.once(0, end) except Exception: # nosec B110 # Condition may already be computed or not once()-able; continue. pass # CRITICAL FIX: Process the main once calculation # Try to call once method if it exists try: if hasattr(self, "once") and callable(self.once): self.once(start, end) except Exception as e: # If once fails or doesn't exist, skip it # The indicator will be calculated via next() calls during strategy execution logger.debug("once() failed: %s", e) # CRITICAL FIX: Update lencount after once processing to match the data length # In runonce mode, lencount should equal the number of data points processed # Get the actual data length from the clock or data source actual_data_len = end try: # Try to get the actual data length from clock or data sources if hasattr(self, "_clock") and self._clock: try: actual_data_len = self._clock.buflen() except Exception as e: logger.debug("clock.buflen() failed: %s", e) try: actual_data_len = len(self._clock) except Exception as e2: logger.debug("len(clock) failed: %s", e2) elif hasattr(self, "datas") and self.datas and len(self.datas) > 0: try: actual_data_len = self.datas[0].buflen() except Exception as e: logger.debug("datas[0].buflen() failed: %s", e) try: actual_data_len = len(self.datas[0]) except Exception as e2: logger.debug("len(datas[0]) failed: %s", e2) # Use the maximum of end and actual_data_len to ensure we don't truncate final_len = max(end, actual_data_len) if actual_data_len > 0 else end except Exception as e: logger.debug("Failed to determine actual data length: %s", e) final_len = end if hasattr(self, "lines") and hasattr(self.lines, "lines") and self.lines.lines: # Update lencount for all lines to match the data length for line in self.lines.lines: if hasattr(line, "lencount"): # CRITICAL FIX: Set lencount to final_len (actual data length) # This ensures len(indicator) == len(strategy) in runonce mode line.lencount = final_len if hasattr(line, "_idx"): # Set _idx to the last processed position line._idx = final_len - 1 if final_len > 0 else -1 # CRITICAL FIX: Call oncebinding to propagate computed values to bound lines # This is needed for bt.If, Logic subclasses etc. that compute values into # their own array and need to copy them to the bound output line. self.oncebinding()
[docs] @classmethod def cleancache(cls): """Clean the cache - called by cerebro""" LineActionsCache.clear_cache()
[docs] @classmethod def usecache(cls, enable=True): """Enable or disable the cache""" LineActionsCache.enable_cache(enable)
[docs] def LineDelay(a, ago=0, **kwargs): """Create a delayed line object. Args: a: Source line object. ago: Number of periods to delay. Negative for lookback. **kwargs: Additional keyword arguments. Returns: _LineDelay or _LineForward: A delayed line object. """ if ago <= 0: return _LineDelay(a, ago, **kwargs) return _LineForward(a, ago)
[docs] def LineNum(num): """Create a constant line from a number. Args: num: The constant value. Returns: _LineDelay: A line object that always returns the constant value. """ return _LineDelay(PseudoArray(repeat(num)), 0)
class _LineDelay(LineActions): """Delayed line object for negative ago values (lookback). This class represents a line that accesses historical values from another line. For example, data(-1) returns the previous bar's value. Attributes: a: The source line object. ago: Number of periods to look back (negative value). """ def __init__(self, a, ago): """Initialize the delayed line. Args: a: Source line object. ago: Number of periods to look back (negative value). """ super().__init__() self.a = self.arrayize(a) self.ago = ago # CRITICAL FIX: Inherit minperiod from source's owner (indicator) if available # When called as nzd(-1), 'a' is nzd.lines[0] which has minperiod=1, # but the indicator nzd has minperiod=20. We need to use the indicator's minperiod. source_minperiod = getattr(a, "_minperiod", 1) # Check if source has an owner with a higher minperiod if hasattr(a, "_owner") and a._owner is not None: owner = a._owner # Check for _owner_ref (Lines object pointing to indicator) if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: owner_minperiod = getattr(owner._owner_ref, "_minperiod", 1) source_minperiod = max(source_minperiod, owner_minperiod) else: owner_minperiod = getattr(owner, "_minperiod", 1) source_minperiod = max(source_minperiod, owner_minperiod) # Update our minperiod with the source's minperiod if source_minperiod > 1: self.updateminperiod(source_minperiod) # Need to add the delay to the period. "ago" is 0 based and therefore # we need to pass an extra 1 which is the minimum defined period for # any data (which will be subtracted inside addminperiod) # CRITICAL FIX: Must add abs(ago) + 1, NOT just abs(ago) self.addminperiod(abs(ago) + 1) def __getitem__(self, idx): """CRITICAL FIX: Override __getitem__ to compute delayed value dynamically. This handles constants wrapped in PseudoArray correctly. For ago=-10 (lookback), accessing [0] should return self.a[-10] (10 bars back). Formula: self.a[idx + ago] where ago is negative for lookback. """ try: # For delay operations, get value from source with delay applied # ago is negative for lookback, so idx + ago gives historical index value = self.a[idx + self.ago] if value is None: return 0.0 if isinstance(value, float) and not math.isfinite(value): return 0.0 return value except (IndexError, TypeError): return 0.0 def next(self): """Calculate and set the delayed value for the current bar. Gets the value from the source line at the delayed position and stores it at position 0. """ # CRITICAL FIX: Proper delay operation # ago is negative for lookback (e.g., ago=-10 means 10 bars back) # We need self.a[ago] to get the historical value try: # Get the delayed value - ago is already negative for lookback delayed_val = self.a[self.ago] # Ensure value is never None or NaN if ( delayed_val is None or isinstance(delayed_val, float) and not math.isfinite(delayed_val) ): delayed_val = 0.0 self[0] = delayed_val except (IndexError, AttributeError): # If we can't get the delayed value, use 0.0 self[0] = 0.0 def once(self, start, end): """Calculate delayed values in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # cache python dictionary lookups dst = self.array ago = self.ago # Ensure destination array is properly sized. Missing delayed values must # remain NaN; using 0.0 would turn unavailable bars into real signals. while len(dst) < end: dst.append(float("nan")) # CRITICAL FIX: Ensure source has computed its values before we access them # This is necessary for LinesOperation sources that haven't run once() yet if hasattr(self.a, "once") and hasattr(self.a, "array") and len(self.a.array) < end: self.a.once(start, end) # CRITICAL FIX: Check if source is a constant value (PseudoArray with repeat) # We need to check the wrapped object, not just the array, because # PseudoArray.array returns a new list each time is_constant = False constant_value = None # Check if self.a is a PseudoArray wrapping a repeat object # OR if self.a is a _LineDelay that wraps a PseudoArray with repeat source_obj = self.a if hasattr(self.a, "a"): # self.a is a _LineDelay, check its source source_obj = self.a.a if hasattr(source_obj, "wrapped"): wrapped = source_obj.wrapped # Check if it's a repeat object if ( isinstance(wrapped, itertools.repeat) or str(type(wrapped)) == "<class 'itertools.repeat'>" ): is_constant = True try: # Get the constant value from the repeat object # Create a new iterator to avoid consuming it constant_value = next(iter(wrapped)) if constant_value is None: constant_value = float("nan") except (StopIteration, TypeError): constant_value = float("nan") # If not a constant, get the source array if not is_constant: src = self.a.array for i in range(start, end): if is_constant: # For constant values, just use the constant dst[i] = constant_value else: # CRITICAL FIX: Proper bounds checking for delayed access # For ago=-26 (forward shift), we need to access src[i + ago] = src[i - 26] # to get the value calculated 26 bars ago src_index = i + ago if src_index >= 0 and src_index < len(src): val = src[src_index] if val is None: val = float("nan") dst[i] = val else: # Out-of-range historical/future access is unavailable, not 0. dst[i] = float("nan") class _LineForward(LineActions): """Forward line object for positive ago values (lookahead). This class represents a line that accesses future values from another line. For example, data(1) returns the next bar's value. Attributes: a: The source line object. ago: Number of periods to look ahead (positive value). """ def __init__(self, a, ago): """Initialize the forward line. Args: a: Source line object. ago: Number of periods to look ahead (positive value). """ super().__init__() self.a = self.arrayize(a) self.ago = ago # Need to add the delay to the period if hasattr(a, "_minperiod"): self.addminperiod(ago) def next(self): """Calculate and set the forwarded value for the current bar. Gets the value from the source line at the forward position and stores it at position 0. """ # operation(float, other) ... expecting other to be a float # CRITICAL FIX: Ensure we get valid numeric values for indicator calculations try: # Get operand values with proper type checking if hasattr(self.a, "__getitem__"): # LineBuffer-like object - get current value try: a_val = self.a[0] except (IndexError, TypeError): a_val = float("nan") else: # Direct value a_val = self.a if hasattr(self.b, "__getitem__"): # LineBuffer-like object - get current value try: b_val = self.b[0] except (IndexError, TypeError): b_val = float("nan") else: # Direct value b_val = self.b # Preserve indicator warmup semantics. NaN/None operands must # remain NaN instead of becoming valid trading signals. if a_val is None or (isinstance(a_val, float) and a_val != a_val): self[0] = float("nan") return if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 elif not isinstance(a_val, (int, float)): try: a_val = float(a_val) except (ValueError, TypeError): self[0] = float("nan") return if b_val is None or (isinstance(b_val, float) and b_val != b_val): self[0] = float("nan") return if isinstance(b_val, float) and not math.isfinite(b_val): b_val = 0.0 elif not isinstance(b_val, (int, float)): try: b_val = float(b_val) except (ValueError, TypeError): self[0] = float("nan") return # CRITICAL FIX: Actually perform the operation and store the result # Handle both normal and reverse operations if hasattr(self, "operation") and self.operation: # CRITICAL FIX: Handle reverse operations properly if getattr(self, "r", False): result = self.operation(b_val, a_val) # Reverse: b op a else: result = self.operation(a_val, b_val) # Normal: a op b # Ensure result is a valid number if result is None: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): if result != result: result = float("nan") else: result = 0.0 elif not isinstance(result, (int, float)): try: result = float(result) except (ValueError, TypeError): result = float("nan") # Store the result in the current position self[0] = result else: # Fallback: store a_val if no operation is defined self[0] = a_val except Exception: logger.debug("LineOwnOperation.next fallback triggered", exc_info=True) # If anything fails, store 0.0 to prevent crashes self[0] = 0.0 def once(self, start, end): """Calculate forwarded values in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # cache python dictionary lookups dst = self.array srca = self.a.array op = self.operation # CRITICAL FIX: Ensure destination array is properly sized while len(dst) < end: dst.append(0.0) # CRITICAL FIX: Ensure source array has required data if len(srca) < end: # If source array is shorter than required range, only process available data end = min(end, len(srca)) # Fast path: process the whole range under a single try. The per-element # try/except below is only entered if something raises, preserving the # exact per-element 0.0 fallback semantics while avoiding per-element # exception-handler setup in the common (no-error) case (R2-S4: PERF203). try: for i in range(start, end): a_val = srca[i] if i < len(srca) else 0.0 if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)): a_val = 0.0 result = op(a_val) if result is None or (isinstance(result, float) and not math.isfinite(result)): result = 0.0 dst[i] = result return except Exception: logger.debug( "LineOwnOperation.once fast path failed; per-element fallback", exc_info=True ) for i in range(start, end): try: # CRITICAL FIX: Bounds checking for source array a_val = srca[i] if i < len(srca) else 0.0 # Ensure value is numeric if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)): a_val = 0.0 result = op(a_val) # Ensure result is valid if result is None or (isinstance(result, float) and not math.isfinite(result)): result = 0.0 dst[i] = result except Exception: logger.debug( "LineOwnOperation.once fallback triggered at index %d", i, exc_info=True ) # If operation fails, store 0.0 dst[i] = 0.0
[docs] class LinesOperation(LineActions): """Operation between two line objects (binary operations). This class represents binary operations (addition, subtraction, etc.) between two line objects. The result is a new line that contains the element-wise operation result. Attributes: operation: The binary function to apply (e.g., operator.add). a: First operand (left-hand side). b: Second operand (right-hand side). r: If True, reverse operation order. _parent_a: Parent indicator for operand a. _parent_b: Parent indicator for operand b. Example: >>> result = LinesOperation(indicator1, indicator2, operator.sub) >>> # result[0] = indicator1[0] - indicator2[0] """ def __init__(self, a, b, operation, r=False, parent_a=None, parent_b=None): """Initialize a binary operation between two line objects. Args: a: First operand (left-hand side). b: Second operand (right-hand side). operation: The binary function to apply (e.g., operator.add). r: If True, reverse operation order (b op a instead of a op b). parent_a: Parent indicator for operand a. parent_b: Parent indicator for operand b. """ super().__init__() self.operation = operation self.a = a # always a linebuffer-like object self.b = self.arrayize(b) self.r = r self._datas = [operand for operand in (self.a, self.b) if isinstance(operand, LineRoot)] if self._datas: data_clock = getattr(self._datas[0], "_clock", None) if data_clock is not None and data_clock.__class__.__name__ != "MinimalClock": self._clock = data_clock else: self._clock = self._datas[0] # CRITICAL FIX: Store references to parent indicators for _once processing # Use passed parent references if available, otherwise try to find them self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a) self._parent_b = parent_b if parent_b is not None else self._find_parent_indicator(b) # ensure a is added if it's a lineiterator-like object # self.addminperiod(1) already done by the base class # CRITICAL FIX: Handle _minperiod attribute access more safely a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1 b_minperiod = getattr(b, "_minperiod", 1) if hasattr(b, "_minperiod") else 1 # Use updateminperiod to take max of operand minperiods # For me1 - me2, minperiod = max(me1._minperiod, me2._minperiod) max_minperiod = max(a_minperiod, b_minperiod) self.updateminperiod(max_minperiod) self._a_minperiod = a_minperiod self._b_minperiod = b_minperiod self._a_guard_minperiod = self._needs_minperiod_guard(self.a) self._b_guard_minperiod = self._needs_minperiod_guard(self.b) self._next_operands = tuple( operand for operand in (self.a, self.b) if operand is not self and isinstance(operand, LineActions) and hasattr(operand, "_next") ) @staticmethod def _is_constant_operand(operand): """Return True for constants wrapped as LineDelay(PseudoArray).""" return ( operand.__class__.__name__ == "_LineDelay" and getattr(operand, "a", None).__class__.__name__ == "PseudoArray" ) @staticmethod def _is_line_delay_operand(operand): return operand.__class__.__name__ == "_LineDelay" @classmethod def _needs_minperiod_guard(cls, operand): return ( not cls._is_constant_operand(operand) and not cls._is_line_delay_operand(operand) and not isinstance(operand, LineActions) and hasattr(operand, "__len__") and hasattr(operand, "_minperiod") ) @staticmethod def _is_missing(value): return value is None or (isinstance(value, float) and value != value) def _operand_value(self, operand, ago=0, guard_minperiod=False, minperiod=1): """Read an operand while preserving indicator warmup NaN semantics.""" if guard_minperiod: try: target_len = len(operand) + ago if target_len < minperiod: return float("nan") except Exception: # nosec B110 # Operand without a usable length; skip the warmup guard. pass if hasattr(operand, "__getitem__"): return operand[ago] return operand def _normalize_operand(self, value): if self._is_missing(value): return float("nan") if isinstance(value, float) and not math.isfinite(value): return 0.0 if isinstance(value, (int, float)): return value try: return float(value) except (ValueError, TypeError): return float("nan") def _next_operand_if_due(self, operand): clock = getattr(operand, "_clock", None) if clock is not None: try: if len(clock) <= len(operand): return except Exception: # nosec B110 # Clock without a comparable length; advance the operand anyway. pass operand._next() def _find_parent_indicator(self, operand): """Find the parent indicator that owns this operand. Only returns LineActions objects. Full Indicator/LineIterator objects are never returned because they are processed separately via the _lineiterators ordering in _once(). Returning a full Indicator here would cause premature once_via_next() calls with incorrect data state. """ # If operand is already a LineActions (arithmetic expression chain), return it if isinstance(operand, LineActions): return operand # For plain LineBuffer: check if owner is a LineActions (not a full Indicator) if hasattr(operand, "_owner") and operand._owner is not None: owner = operand._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: ref = owner._owner_ref if isinstance(ref, LineActions): return ref if isinstance(owner, LineActions): return owner return None
[docs] def __getitem__(self, ago): """Get value at the specified offset. In runonce mode, the array is pre-computed by once(), so use it directly. Falls back to dynamic computation only if the array is not populated. """ try: # Use pre-computed array if available (runonce mode) current_idx = self._idx if current_idx >= 0 and len(self.array) > 0: target_idx = current_idx + ago if 0 <= target_idx < len(self.array): value = self.array[target_idx] if value is not None: if isinstance(value, float): if value == value: if not math.isfinite(value): return 0.0 return value else: return value # Fallback: compute value dynamically from source operands. a_val = self._normalize_operand( self._operand_value(self.a, ago, self._a_guard_minperiod, self._a_minperiod) ) b_val = self._normalize_operand( self._operand_value(self.b, ago, self._b_guard_minperiod, self._b_minperiod) ) if self._is_missing(a_val): return float("nan") if self._is_missing(b_val): return float("nan") # Compute and return the operation result if self.r: result = self.operation(b_val, a_val) else: result = self.operation(a_val, b_val) if self._is_missing(result): return float("nan") if isinstance(result, float) and not math.isfinite(result): return 0.0 return result except (IndexError, TypeError): return float("nan")
def _next(self): """CRITICAL FIX: _next() method for compatibility with LineIterator processing loop. This method is called by LineIterator._next() for items in _lineiterators[IndType]. """ # Clock guard: skip if already advanced to the current clock position. # This prevents double-advancing when a LinesOperation is both registered # directly in _lineiterators AND driven via a parent's _next_operands chain. clock = getattr(self, "_clock", None) if clock is not None and clock.__class__.__name__ != "MinimalClock": try: if len(clock) <= len(self): return except Exception: # nosec B110 # Clock without a comparable length; proceed to advance operands. pass for operand in self._next_operands: self._next_operand_if_due(operand) # Advance the line buffer self.advance() # Call next() to compute the value self.next() # Update bindings so bound lines get the computed value for binding in self.bindings: binding[0] = self[0]
[docs] def next(self): """Calculate and set the operation result for the current bar. Performs the binary operation on the current values of both operands and stores the result at position 0. """ # operation(float, other) ... expecting other to be a float # CRITICAL FIX: Ensure we get valid numeric values for indicator calculations try: a_val = self._normalize_operand( self._operand_value(self.a, 0, self._a_guard_minperiod, self._a_minperiod) ) b_val = self._normalize_operand( self._operand_value(self.b, 0, self._b_guard_minperiod, self._b_minperiod) ) if self._is_missing(a_val) or self._is_missing(b_val): self[0] = float("nan") return # CRITICAL FIX: Actually perform the operation and store the result # Handle both normal and reverse operations if hasattr(self, "operation") and self.operation: # CRITICAL FIX: Handle reverse operations properly if getattr(self, "r", False): result = self.operation(b_val, a_val) # Reverse: b op a else: result = self.operation(a_val, b_val) # Normal: a op b # Ensure result is a valid number if result is None: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): if result != result: result = float("nan") else: result = 0.0 elif not isinstance(result, (int, float)): try: result = float(result) except (ValueError, TypeError): result = float("nan") # Store the result in the current position self[0] = result else: # Fallback: store a_val if no operation is defined self[0] = a_val except Exception: self[0] = float("nan")
[docs] def once(self, start, end): """Calculate operation results in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # CRITICAL FIX: Always use start=0 for nested operations # This ensures historical values are available for indicators like SMA nested_start = 0 # CRITICAL FIX: Call parent indicators' once() methods to populate their arrays # This is needed for cases like dif = ema_1 - ema_2 where ema_1/ema_2 must be computed first if self._parent_a is not None and hasattr(self._parent_a, "once"): try: self._parent_a.once(nested_start, end) except Exception as e: logger.debug("parent_a.once() failed: %s", e) if self._parent_b is not None and hasattr(self._parent_b, "once"): try: self._parent_b.once(nested_start, end) except Exception as e: logger.debug("parent_b.once() failed: %s", e) # CRITICAL FIX: Call once() on operands that have it, but ONLY for LineActions # instances (like _LineDelay, LinesOperation). Never call once() on full Indicators # (ATR, SuperTrend, etc.) because those are managed by _lineiterators in _once(). # Calling once() on a full Indicator here would trigger premature once_via_next calls # before the indicator's data state is properly set up. if isinstance(self.a, LineActions) and hasattr(self.a, "once"): try: self.a.once(nested_start, end) except Exception as e: logger.debug("operand a.once() failed: %s", e) if isinstance(self.b, LineActions) and hasattr(self.b, "once"): try: self.b.once(nested_start, end) except Exception as e: logger.debug("operand b.once() failed: %s", e) # CRITICAL FIX: Always process from 0 to populate historical values if hasattr(self.b, "array") and type(self.b).__name__ != "PseudoArray": self._once_op(nested_start, end) else: if isinstance(self.b, float): ( self._once_val_op_r(nested_start, end) if self.r else self._once_val_op(nested_start, end) ) else: self._once_time_op(nested_start, end) # CRITICAL FIX: Call oncebinding to copy computed values to bound lines # This is needed in runonce mode where once() computes all values at once self.oncebinding()
def _once_op(self, start, end): # Only call once() on LineActions instances (e.g., _LineDelay), not full Indicators if isinstance(self.b, LineActions) and hasattr(self.b, "once") and len(self.b.array) < end: try: self.b.once(start, end) except Exception as e: logger.debug("b.once() in _once_op failed: %s", e) # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b.array op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data # CRITICAL FIX: Check if b is a _LineDelay wrapping a constant (PseudoArray) # In this case, srcb will be empty but b[i] will return the constant is_constant_b = ( len(srcb) == 0 and hasattr(self.b, "a") and type(self.b.a).__name__ == "PseudoArray" ) if is_constant_b: # b is a _LineDelay wrapping a constant - use srca length only end = min(end, len(srca)) else: end = min(end, len(srca), len(srcb)) # Use dynamic access for constant values wrapped in _LineDelay use_dynamic_b = is_constant_b # CRITICAL FIX: Always process from 0 to ensure historical values are available # This is needed for indicators like SMA that need historical values for their calculations actual_start = 0 # Fast path under a single try; the per-element try/except below is only # entered on error, preserving NaN-on-failure semantics while removing # per-element exception-handler setup in the common case (R2-S4: PERF203). try: for i in range(actual_start, end): a_val = srca[i] b_val = self.b[i] if use_dynamic_b else srcb[i] if a_val is None or a_val != a_val or b_val is None or b_val != b_val: dst[i] = float("nan") continue if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 if isinstance(b_val, float) and not math.isfinite(b_val): b_val = 0.0 result = op(b_val, a_val) if self.r else op(a_val, b_val) if result is None or result != result: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): result = 0.0 dst[i] = result return except Exception: logger.debug( "LinesOperation._once_op fast path failed; per-element fallback", exc_info=True ) for i in range(actual_start, end): try: a_val = srca[i] if use_dynamic_b: b_val = self.b[i] # Use __getitem__ for constants else: b_val = srcb[i] # Preserve NaN semantics for indicators: if any operand is None/NaN -> NaN if a_val is None or a_val != a_val or b_val is None or b_val != b_val: dst[i] = float("nan") continue if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 if isinstance(b_val, float) and not math.isfinite(b_val): b_val = 0.0 if self.r: result = op(b_val, a_val) else: result = op(a_val, b_val) # Preserve NaN semantics if result is None or result != result: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): result = 0.0 dst[i] = result except Exception: logger.debug( "LinesOperation._once_op fallback triggered at index %d", i, exc_info=True ) # If operation fails, store NaN for indicator semantics dst[i] = float("nan") def _once_time_op(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b[0] op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] # Preserve NaN semantics if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 if isinstance(srcb, float) and not math.isfinite(srcb): srcb = 0.0 if self.r: result = op(srcb, a_val) else: result = op(a_val, srcb) if result is None or result != result: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): result = 0.0 dst[i] = result except Exception: logger.debug( "LinesOperation._once_time_op fallback triggered at index %d", i, exc_info=True ) dst[i] = float("nan") def _once_val_op(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b[0] if hasattr(self.b, "__getitem__") else self.b op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 if isinstance(srcb, float) and not math.isfinite(srcb): srcb = 0.0 result = op(a_val, srcb) if result is None or result != result: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): result = 0.0 dst[i] = result except Exception: logger.debug( "LinesOperation._once_val_op fallback triggered at index %d", i, exc_info=True ) dst[i] = float("nan") def _once_val_op_r(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b[0] if hasattr(self.b, "__getitem__") else self.b op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 if isinstance(srcb, float) and not math.isfinite(srcb): srcb = 0.0 result = op(srcb, a_val) if result is None or result != result: result = float("nan") elif isinstance(result, float) and not math.isfinite(result): result = 0.0 dst[i] = result except Exception: logger.debug( "LinesOperation._once_val_op_r fallback triggered at index %d", i, exc_info=True ) dst[i] = float("nan")
[docs] class LineOwnOperation(LineActions): """Operation on a single line object (unary operations). This class represents unary operations (negation, absolute value, etc.) on a single line object. The result is a new line that contains the element-wise operation result. Attributes: operation: The unary function to apply (e.g., operator.neg). a: The operand (line object). _parent_a: Parent indicator for the operand. Example: >>> result = LineOwnOperation(indicator, operator.neg) >>> # result[0] = -indicator[0] """ def __init__(self, a, operation, parent_a=None): """Initialize a unary operation on a line object. Args: a: The operand (line object). operation: The unary function to apply (e.g., operator.neg). parent_a: Parent indicator for the operand. """ super().__init__() self.operation = operation self.a = a # CRITICAL FIX: Store reference to parent indicator for _once processing self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a) a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1 self.updateminperiod(a_minperiod) def _find_parent_indicator(self, operand): """Find the parent indicator that owns this operand. Only returns LineActions objects. Full Indicators are never returned to prevent premature once_via_next() calls (see LinesOperation._find_parent_indicator). """ if isinstance(operand, LineActions): return operand if hasattr(operand, "_owner") and operand._owner is not None: owner = operand._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: ref = owner._owner_ref if isinstance(ref, LineActions): return ref if isinstance(owner, LineActions): return owner return None
[docs] def __getitem__(self, ago): """CRITICAL FIX: Override __getitem__ to compute value dynamically from source operand.""" try: a_val = self.a[ago] if hasattr(self.a, "__getitem__") else self.a if a_val is None or (isinstance(a_val, float) and a_val != a_val): return float("nan") if isinstance(a_val, float) and not math.isfinite(a_val): a_val = 0.0 result = self.operation(a_val) if isinstance(result, float) and not math.isfinite(result): return 0.0 return result except (IndexError, TypeError): return float("nan")
[docs] def next(self): """Calculate and set the unary operation result for the current bar. Performs the unary operation on the current value of the operand and stores the result at position 0. """ a_val = self.a[0] if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)): a_val = 0.0 result = self.operation(a_val) if result is None or (isinstance(result, float) and not math.isfinite(result)): result = 0.0 self[0] = result
[docs] def once(self, start, end): """Calculate unary operation results in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # CRITICAL FIX: Ensure source operand is processed first if self._parent_a is not None and hasattr(self._parent_a, "_once"): try: self._parent_a._once(start, end) except Exception as e: logger.debug("parent_a._once() in LineOwnOperation failed: %s", e) # cache python dictionary lookups dst = self.array srca = self.a.array op = self.operation # CRITICAL FIX: Ensure destination array is properly sized while len(dst) < end: dst.append(float("nan")) # CRITICAL FIX: Ensure source array has required data if len(srca) < end: # If source array is shorter than required range, only process available data end = min(end, len(srca)) # Fast path under a single try; per-element fallback only on error # (preserves 0.0-on-failure semantics, removes per-element handler setup; R2-S4). try: for i in range(start, end): a_val = srca[i] if i < len(srca) else 0.0 if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)): a_val = 0.0 result = op(a_val) if result is None or (isinstance(result, float) and not math.isfinite(result)): result = 0.0 dst[i] = result return except Exception: logger.debug("unary once fast path failed; per-element fallback", exc_info=True) for i in range(start, end): try: # CRITICAL FIX: Bounds checking for source array a_val = srca[i] if i < len(srca) else 0.0 # Ensure value is numeric if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)): a_val = 0.0 result = op(a_val) # Ensure result is valid if result is None or (isinstance(result, float) and not math.isfinite(result)): result = 0.0 dst[i] = result except Exception: # If operation fails, store 0.0 dst[i] = 0.0
[docs] def size(self): """Return the number of lines in this LineActions object""" if hasattr(self, "lines") and hasattr(self.lines, "size"): return self.lines.size() if hasattr(self, "lines") and hasattr(self.lines, "__len__"): return len(self.lines) return 1 # Default to 1 line if no lines object available