#!/usr/bin/env python
"""LineBuffer Module - Circular buffer storage for time-series data.
This module provides the LineBuffer class which implements a circular
buffer for storing time-series data. The buffer allows efficient
operations like appending, forwarding, rewinding, and resetting.
Key Features:
- Index 0 always points to the current active value
- Positive indices fetch past values (left-hand side)
- Negative indices fetch future values (right-hand side)
- Automatic memory management with qbuffer
- Line bindings for automatic value propagation
Classes:
LineBuffer: Core circular buffer implementation.
LineActions: Base class for line objects with multiple lines.
LineActionsMixin: Mixin providing line operations.
LineActionsCache: Cache system for performance optimization.
PseudoArray: Wrapper for non-array iterables.
LinesOperation: Operations on multiple lines.
LineOwnOperation: Operations on owned lines.
Example:
Basic buffer usage:
>>> buf = LineBuffer()
>>> buf.home() # Reset to beginning
>>> buf.forward() # Move to next position
>>> buf[0] = 100.0 # Set current value
>>> print(buf[0]) # Get current value
100.0
>>> print(buf[-1]) # Get previous value
"""
import array
import collections
import datetime
import itertools
import math
from itertools import islice, repeat
from . import metabase
from .lineroot import LineRoot, LineRootMixin, LineSingle
from .utils import num2date
from .utils.log_message import get_logger
from .utils.py3 import range, string_types
logger = get_logger(__name__)
NAN = float("NaN")
# PERFORMANCE OPTIMIZATION: Pre-create default datetime for error recovery
# Avoids repeated datetime object creation in hot path
_DEFAULT_DATETIME = datetime.datetime(2000, 1, 1, 0, 0, 0)
# PERFORMANCE OPTIMIZATION: Helper function to check for NaN/None values
# Using value != value is much faster than isinstance + math.isnan
def _is_nan_or_none(value):
"""Fast check for NaN or None values.
NaN is the only value that's not equal to itself (value != value).
This is much faster than isinstance(value, float) and math.isnan(value).
"""
return value is None or value != value
[docs]
class LineBuffer(LineSingle, LineRootMixin):
"""
LineBuffer defines an interface to an "array.array" (or list) in which
index 0 points to the item which is active for input and output.
Positive indices fetch values from the past (left-hand side)
Negative indices fetch values from the future (if the array has been
extended on the right-hand side)
With this behavior, no index has to be passed around to entities which have
to work with the current value produced by other entities: the value is
always reachable at "0".
Likewise, storing the current value produced by "self" is done at 0.
Additional operations to move the pointer (home, forward, extend, rewind,
advance getzero) are provided
The class can also hold "bindings" to other LineBuffers. When a value
is set in this class,
it will also be set in the binding.
"""
# Define LineBuffer mode attributes: UnBounded (0) and QBuffer (1)
UnBounded, QBuffer = (0, 1)
# Initialization
[docs]
def __init__(self):
"""Initialize the LineBuffer instance.
Sets up all internal attributes including the array storage,
index pointer, buffer mode, and performance optimization flags.
"""
# ===== Optimization A: Pre-initialize all attributes to eliminate runtime hasattr checks =====
# Core attributes - must be initialized first
self._minperiod = 1 # Minimum period
self._array = array.array("d") # Internal array storage
self._idx = -1 # Current index
self._size = 0 # Current array size
# Buffer-related attributes - set to reasonable defaults
self.maxlen = 0 # Maximum length (used in QBuffer mode)
self.extension = 0 # Extension size
self.lencount = 0 # Length counter
self.useislice = False # Whether to use islice
self.extrasize = 0 # Extra size
self.lenmark = 0 # Length mark
# Array - initialize as empty array (will be reset based on mode in reset())
self.array = array.array("d")
# Lines-related - ensure lines exists
if not hasattr(self, "lines"):
self.lines = [self] # lines is a list containing itself
# Mode and bindings
self.mode = self.UnBounded # Default unbounded mode
self.bindings = [] # Binding list
# Other attributes
self._tz = None # Timezone setting
self._owner = None # Owner object
self._clock = None # Clock object
self._ltype = None # Line type
# Pre-calculate whether this is an indicator line to avoid repeated checks in hot paths
try:
self._is_indicator = (self._ltype == 0) or ("Indicator" in str(self.__class__.__name__))
except Exception:
self._is_indicator = False
# Performance optimization: pre-calculate whether this is a datetime line
# to avoid repeated checks in __setitem__. Check once at init and cache the result.
self._is_datetime_line = False
try:
if hasattr(self, "_name"):
name_str = str(self._name).lower()
self._is_datetime_line = "datetime" in name_str
elif hasattr(self, "__class__"):
class_str = str(self.__class__.__name__).lower()
self._is_datetime_line = "datetime" in class_str
except Exception:
self._is_datetime_line = False
# Pre-calculate default value to avoid repeated checks in __setitem__
if self._is_datetime_line:
self._default_value = 1.0 # datetime lines use 1.0 (valid ordinal value)
elif self._is_indicator:
self._default_value = float("nan") # indicators use NaN
else:
self._default_value = 0.0 # others use 0.0
# Recursion guard (for __len__)
self._in_len = False # Instance attribute guard replacing global set
self._dt_cache_idx = None
self._dt_cache_value = None
self._dt_cache_tz = None
self._dt_cache_dt = None
# Call reset to complete initialization
self.reset() # Reset, call own reset method
# Get the value of _idx
[docs]
def get_idx(self):
"""Get the current index position.
Returns:
int: The current index in the buffer.
"""
# Optimization A: Removed hasattr check, __init__ ensures _idx exists
return self._idx
def _refresh_cached_line_flags(self, owner=None, ltype=None):
"""Refresh cached owner/type-derived flags after a line is attached."""
if owner is not None:
self._owner = owner
if ltype is not None:
self._ltype = ltype
effective_ltype = getattr(self, "_ltype", None)
owner_obj = getattr(self, "_owner", None)
owner_ref = getattr(owner_obj, "_owner_ref", None)
if effective_ltype is None and owner_ref is not None:
effective_ltype = getattr(owner_ref, "_ltype", None)
if effective_ltype is None and owner_obj is not None:
effective_ltype = getattr(owner_obj, "_ltype", None)
try:
self._is_indicator = (effective_ltype == LineRoot.IndType) or (
"Indicator" in str(self.__class__.__name__)
)
except Exception:
self._is_indicator = False
try:
if hasattr(self, "_name"):
name_str = str(self._name).lower()
self._is_datetime_line = "datetime" in name_str
else:
class_str = str(self.__class__.__name__).lower()
self._is_datetime_line = "datetime" in class_str
except Exception:
self._is_datetime_line = False
if self._is_datetime_line:
self._default_value = 1.0
elif self._is_indicator:
self._default_value = float("nan")
else:
self._default_value = 0.0
# Set the value of _idx
[docs]
def set_idx(self, idx, force=False):
"""Set the index position.
Args:
idx: The new index value.
force: If True, force set even in QBuffer mode at lenmark.
Note:
In QBuffer mode, when at lenmark, the index stays at 0
unless force is True. This allows resampling operations.
"""
# If QBuffer and the last position of the buffer were reached, keep
# it (unless force) as index 0. This allows resampling
# - forward adds a position. However, the 1st one is discarded, the 0 is
# invariant
# force supports replaying, which needs the extra bar to float
# forward/backwards, because the last input is read, and after a
# "backwards" is used to update the previous data. Unless position
# 0 was moved to the previous index, it would fail
# Optimization A: Removed all hasattr checks, __init__ ensures all attributes exist
if self.mode == self.QBuffer:
if force or self._idx < self.lenmark:
self._idx = idx
else: # default: UnBounded
self._idx = idx
# Property usage: can be used to get and set idx
idx = property(get_idx, set_idx)
# Reset
[docs]
def reset(self):
"""Resets the internal buffer structure and the indices"""
# CRITICAL FIX: In runonce mode, if array is already populated (from _once()),
# preserve the array and lencount, only reset idx
# Check if we're in runonce mode and array is populated
preserve_array = False
try:
# Check if this is an indicator line that was processed in runonce mode
# Line's _owner might be a Lines object, which has _owner pointing to the indicator
if hasattr(self, "_owner") and self._owner is not None:
owner = self._owner
# Check if owner is a Lines object (which wraps lines for indicators)
# Lines objects have _owner pointing to the actual indicator
if hasattr(owner, "_owner") and owner._owner is not None:
indicator = owner._owner
# Check if indicator was processed in runonce mode
if hasattr(indicator, "_once_called") and indicator._once_called:
# Check if array has data
if hasattr(self, "array") and self.array is not None:
array_len = len(self.array)
if array_len > 0:
preserve_array = True
# Also check if owner itself is an indicator
elif hasattr(owner, "_once_called") and owner._once_called:
# Check if array has data
if hasattr(self, "array") and self.array is not None:
array_len = len(self.array)
if array_len > 0:
preserve_array = True
except Exception as e:
logger.debug("Failed to check runonce array preservation: %s", e)
if preserve_array:
# In runonce mode with populated arrays, preserve the precomputed
# values but restart logical length so Cerebro's event replay can
# advance indicators according to their own clocks.
self.idx = -1
if hasattr(self, "lencount"):
self.lencount = 0
self.extension = 0
else:
# Normal reset: clear array and reset all counters
# Optimization A: Removed hasattr checks, all attributes initialized in __init__
# If in cache mode (QBuffer), use deque to store data with fixed size
if self.mode == self.QBuffer:
# Add extrasize to ensure resample/replay work
deque_maxlen = max(1, self.maxlen + self.extrasize)
self.array = collections.deque(maxlen=deque_maxlen)
self.useislice = True
else:
# Non-cache mode, use array.array
self.array = array.array("d")
self.useislice = False
# CRITICAL FIX: Do NOT pre-fill array - this causes buflen() to be incorrect
# buflen() = len(array) - extension, so pre-filling increases buflen incorrectly
# Instead, let forward() handle array growth naturally
# Reset counters and indices
self.lencount = 0
self.idx = -1
self.extension = 0
# Set cache-related variables
[docs]
def qbuffer(self, savemem=0, extrasize=0):
"""Enable queued buffer mode for memory-efficient storage.
Args:
savemem: Memory saving mode (0=normal, >0=enable cache mode).
extrasize: Extra buffer size for resampling/replay operations.
Note:
In QBuffer mode, only the last maxlen values are kept,
reducing memory usage for long backtests.
"""
self.mode = self.QBuffer # Set specific mode
self.maxlen = max(1, self._minperiod) # Set maximum length, ensure at least 1
self.extrasize = max(0, extrasize) # Set extra size, ensure non-negative
self.lenmark = self.maxlen - (not self.extrasize) # Max length minus 1 if extrasize=0
self.reset() # Reset
# Get indicator values
[docs]
def getindicators(self):
"""Get list of indicators using this line buffer.
Returns:
list: Empty list for base LineBuffer (override in subclasses).
"""
return []
# Minimum buffer
[docs]
def minbuffer(self, size):
"""The linebuffer must guarantee the minimum requested size to be
available.
In non-dqbuffer mode, this is always true (of course, until data is
filled at the beginning, there are fewer values, but minperiod in the
framework should account for this.
In dqbuffer mode, the buffer has to be adjusted for this if currently
less than requested
"""
# If not in cache mode or max length is already >= size, return None
if self.mode != self.QBuffer or self.maxlen >= size:
return
# In cache mode, set maxlen equal to size
self.maxlen = size
# Max length minus 1 if self.extrasize=0
self.lenmark = self.maxlen - (not self.extrasize)
# Reset
self.reset()
# Return actual length
[docs]
def __len__(self):
"""
Return the linebuffer's length counter.
Performance optimization: Restore master branch's simple implementation
- Directly return self.lencount (pre-calculated length value)
- Remove all recursion checks, hasattr calls and complex logic
- Performance improvement: from 0.611s to ~0.05s (92% improvement)
"""
return self.lencount
# Return the length of data in the line cache
[docs]
def buflen(self):
"""Real data that can be currently held in the internal buffer
The internal buffer can be longer than the actual stored data to
allow for "lookahead" operations. The real amount of data that is
held/can be held in the buffer
is returned
"""
return len(self.array) - self.extension
[docs]
def __getitem__(self, ago):
"""
Get the value at a specified offset - optimized for hot path.
Args:
ago (int): Relative offset from current index (0=current, -1=previous, 1=next)
Returns:
Value at the specified position
"""
# PERFORMANCE OPTIMIZATION: Fast path for common case (ago <= 0)
# Avoid __dict__ access for majority of calls
try:
current_idx = self._idx
lencount = self.lencount
if lencount > 0 and current_idx >= lencount:
current_idx = lencount - 1
value = self.array[current_idx + ago]
if isinstance(value, float) and not math.isfinite(value) and value == value:
return 0.0
return value
except IndexError:
# Index out of buffer range; fall through to the slow-path handling.
pass
# Slow path: handle special cases
# CRITICAL FIX: For data feed lines accessing FUTURE data
is_data_feed_line = getattr(self, "_is_data_feed_line", False)
if is_data_feed_line and ago > 0:
target_idx = self._idx + ago
if target_idx >= len(self.array) or self.array[target_idx] == 0.0:
raise IndexError("array index out of range")
# Check the simple flag for data feed line
if is_data_feed_line:
raise IndexError("array index out of range")
# For indicators and other cases, return appropriate default
if getattr(self, "_is_indicator", False):
return float("nan")
return 0.0
# Get data values, widely used in strategies
[docs]
def get(self, ago=0, size=1):
"""Returns a slice of the array relative to *ago*
Keyword Args:
ago (int): Point of the array to which size will be added
to return the slice size(int): size of the slice to return,
can be positive or negative
If size is positive *ago* will mark the end of the iterable and vice
versa if size is negative
Returns:
A slice of the underlying buffer
"""
# Whether to use islice, use following syntax if true
if self.useislice:
start = self._idx + ago - size + 1
end = self._idx + ago + 1
values = list(islice(self.array, start, end))
else:
# If not using islice, directly slice the array
values = list(self.array[self._idx + ago - size + 1 : self._idx + ago + 1])
return array.array(
"d",
(
(
0.0
if isinstance(value, float) and not math.isfinite(value) and value == value
else value
)
for value in values
),
)
# Return the value at the actual index 0 of the array
[docs]
def getzeroval(self, idx=0):
"""Returns a single value of the array relative to the real zero
of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
Returns:
A slice of the underlying buffer
"""
value = self.array[idx]
if isinstance(value, float) and not math.isfinite(value) and value == value:
return 0.0
return value
# Return data of size starting from idx in the array
[docs]
def getzero(self, idx=0, size=1):
"""Returns a slice of the array relative to the real zero of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
Returns:
A slice of the underlying buffer
"""
if self.useislice:
values = list(islice(self.array, idx, idx + size))
else:
values = list(self.array[idx : idx + size])
return array.array(
"d",
(
(
0.0
if isinstance(value, float) and not math.isfinite(value) and value == value
else value
)
for value in values
),
)
# Set values to the array
[docs]
def __setitem__(self, ago, value):
"""Sets a value at position "ago" and executes any associated bindings
Keyword Args:
ago (int): Point of the array to which size will be added to return
the slice
value (variable): value to be set
Performance optimization: Use pre-calculated flags to avoid repeated
hasattr and string operations
"""
# Performance optimization: Use try-except instead of hasattr to check array existence
# array is already initialized in __init__, this is just a defensive check
try:
array = self.array
except AttributeError:
import array as array_module
array = array_module.array("d")
self.array = array
# Performance optimization: Use pre-calculated flags and default values
# Handle None/NaN values - use fast path for checking
if value is None:
value = self._default_value
# PERFORMANCE OPTIMIZATION: Use value != value for NaN check.
# Preserve explicit NaN writes for non-datetime lines. Data feeds often
# use NaN as a sparse signal sentinel; converting it to 0.0 turns
# "no signal" into a finite tradable value.
elif value != value: # NaN detection without isinstance + isnan
value = self._default_value if self._is_datetime_line else float("nan")
elif isinstance(value, float) and not math.isfinite(value):
value = self._default_value
# datetime line value validation
elif self._is_datetime_line and value < 1.0:
value = 1.0
elif self._is_datetime_line:
# For non-numeric datetime line values, convert to 1.0
try:
float_value = float(value)
value = 1.0 if float_value < 1.0 else float_value
except (TypeError, ValueError):
value = 1.0
# Calculate the required index
required_index = self._idx + ago
# Handle index out of bounds - fast path
array_len = len(array)
if required_index >= array_len:
# Performance optimization: Use pre-calculated default value as fill value
fill_value = self._default_value
extend_size = required_index - array_len + 1
# Batch extend the array
for _ in range(extend_size):
array.append(fill_value)
elif required_index < 0:
# Skip setting values for negative indices
return
# Set the value at the required index
array[required_index] = value
# Update any bindings - only execute if bindings exist
# Performance optimization: bindings are empty in most cases, check before processing
if self.bindings:
for binding in self.bindings:
# Performance optimization: Use try-except to get binding's datetime flag
# Most bindings are not datetime lines, fast path
try:
binding_is_datetime = binding._is_datetime_line
except AttributeError:
# Binding doesn't have pre-calculated flag, fall back to simple check
binding_is_datetime = False
binding_value = value
if binding_is_datetime and (
not isinstance(binding_value, (int, float)) or binding_value < 1.0
):
binding_value = 1.0
binding[ago] = binding_value
# Set specific value to array
[docs]
def set(self, value, ago=0):
"""Sets a value at position "ago" and executes any associated bindings
Keyword Args:
value (variable): value to be set
ago (int): Point of the array to which size will be added to return
the slice
PERF: Uses pre-calculated _is_datetime_line and _default_value flags
instead of hasattr/isinstance checks on every call.
"""
# PERF: Use pre-calculated flag instead of hasattr + string ops
is_dt = self._is_datetime_line
# Handle None/NaN values using fast detection
if value is None or value != value or isinstance(value, float) and not math.isfinite(value):
value = self._default_value
elif is_dt and (not isinstance(value, (int, float)) or value < 1.0):
value = 1.0
# Array is always initialized in __init__, use direct access
arr = self.array
required_index = self._idx + ago
arr_len = len(arr)
if required_index >= arr_len:
fill_value = self._default_value
for _ in range(required_index - arr_len + 1):
arr.append(fill_value)
elif required_index < 0:
return
arr[required_index] = value
if self.bindings:
for binding in self.bindings:
try:
b_is_dt = binding._is_datetime_line
except AttributeError:
b_is_dt = False
b_val = value
if b_is_dt and (not isinstance(b_val, (int, float)) or b_val < 1.0):
b_val = 1.0
binding[ago] = b_val
# Return to the beginning
[docs]
def home(self):
"""Rewinds the logical index to the beginning
The underlying buffer remains untouched and the actual len can be found
out with buflen
"""
self.idx = -1
self.lencount = 0
# Move forward one step
[docs]
def forward(self, value=NAN, size=1):
"""Moves the logical index forward and enlarges the buffer as much as needed
Keyword Args:
value (variable): value to be set in new positions
size (int): How many extra positions to enlarge the buffer
"""
# PERFORMANCE OPTIMIZATION: Direct attribute access (faster than __dict__.get)
# Attributes are guaranteed to exist after __init__
is_indicator = self._is_indicator
# PERFORMANCE OPTIMIZATION: Use value != value for NaN check
# NaN is the only value that's not equal to itself
if value is None or value != value or isinstance(value, float) and not math.isfinite(value):
value = self._default_value
# For non-indicators, follow clock synchronization
if not is_indicator:
clock = self._clock
if clock is not None:
try:
clock_len = len(clock)
current_len = self.lencount
if current_len >= clock_len:
return
max_advance = clock_len - current_len
if size > max_advance:
size = max_advance
if size <= 0:
return
except Exception as e:
logger.debug("Failed to check clock length in forward: %s", e)
# CRITICAL FIX: Ensure we have a valid size
if size <= 0:
return
if self.mode == self.QBuffer:
self.idx = self._idx + size
else:
self._idx += size
self.lencount += size
append_val = value
if size == 1:
self.array.append(append_val)
else:
# Batch extend for multiple positions
self.array.extend([append_val] * size)
# Move backward one step
[docs]
def backwards(self, size=1, force=False):
"""Moves the logical index backwards and reduces the buffer as much as needed
Keyword Args:
size (int): How many extra positions to rewind the buffer
force (bool): Whether to force the reduction of the logical buffer
regardless of the minperiod
"""
# CRITICAL FIX: Match master behavior - use set_idx for force support and pop array elements
new_idx = self._idx - size
if self.mode == self.QBuffer:
self.set_idx(new_idx, force=force)
else:
self._idx = new_idx
self.lencount -= size
# PERFORMANCE OPTIMIZATION: Use slice deletion instead of loop pop
# Called 3.4M+ times, batch deletion is faster than loop
arr = self.array
arr_len = len(arr)
if arr_len > 0:
remove_count = min(size, arr_len)
try:
del arr[arr_len - remove_count :]
except TypeError:
# qbuffer/exactbars uses deque, which does not support slice
# deletion. Remove newest values explicitly in that mode.
for _ in range(remove_count):
arr.pop()
# Move backward one step (original backwards was overridden)
[docs]
def safe_backwards(self, size=1):
"""Safely move the index backwards without raising errors.
Args:
size: Number of positions to move backwards.
Returns:
bool: True if index is still >= 0 after moving, False otherwise.
"""
# PERF: _idx is always initialized in __init__, skip hasattr
idx = self._idx
if idx is None:
self._idx = -1
return False
self._idx = idx - size
return self._idx >= 0
# Decrease idx and lencount by size
[docs]
def rewind(self, size=1):
"""Rewind the buffer by decreasing idx and lencount.
Args:
size: Number of positions to rewind.
"""
# PERF: idx and lencount are always initialized in __init__
if self.mode == self.QBuffer:
self.idx = self._idx - size
else:
self._idx -= size
self.lencount -= size
# Increase idx and lencount by size
[docs]
def advance(self, size=1):
"""Advances the logical index without touching the underlying buffer"""
# CRITICAL FIX: Remove hasattr checks - attributes are always initialized in __init__
# The hasattr checks were preventing proper advancement
if self.mode == self.QBuffer:
self.idx = self._idx + size
else:
self._idx += size
self.lencount += size
# Extend forward
[docs]
def extend(self, value=float("nan"), size=0):
"""Extends the underlying array with positions that the index will not reach
Keyword Args:
value (variable): value to be set in new positins
size (int): How many extra positions to enlarge the buffer
The purpose is to allow for lookahead operations or to be able to
set values in the buffer "future"
"""
if value is None or value != value or isinstance(value, float) and not math.isfinite(value):
value = self._default_value
self.extension += size
for i in range(size):
self.array.append(value)
# Add another LineBuffer
[docs]
def addbinding(self, binding):
"""Adds another line binding
Keyword Args:
binding (LineBuffer): another line that must be set when this line
becomes a value
"""
self.bindings.append(binding)
# record in the binding when the period is starting (never sooner
# than self)
binding.updateminperiod(self._minperiod)
# Get all data starting from idx
[docs]
def plot(self, idx=0, size=None):
"""Returns a slice of the array relative to the real zero of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
This is a variant of getzero that unless told otherwise returns the
entire buffer, which is usually the idea behind plottint (all must
be plotted)
Returns:
A slice of the underlying buffer
"""
return self.getzero(idx, size or len(self))
# Get partial data from array
[docs]
def plotrange(self, start, end):
"""Get a slice of data from the array.
Args:
start: Start index of the slice.
end: End index of the slice.
Returns:
list or array: Slice of data from start to end.
"""
if self.useislice:
values = list(islice(self.array, start, end))
else:
values = list(self.array[start:end])
return [
(
0.0
if isinstance(value, float) and not math.isfinite(value) and value == value
else value
)
for value in values
]
# Set array values for each binding when running in once mode
[docs]
def oncebinding(self):
"""
Executes the bindings when running in "once" mode
"""
larray = self.array
blen = self.buflen()
for binding in self.bindings:
binding.array[0:blen] = larray[0:blen]
# Convert binding to line
[docs]
def bind2lines(self, binding=0):
"""
Stores a binding to another line. "Binding" can be an index or a name
"""
if isinstance(binding, string_types):
line = getattr(self._owner.lines, binding)
else:
line = self._owner.lines[binding]
self.addbinding(line)
return self
bind2line = bind2lines
[docs]
def __call__(self, ago=None):
"""Returns either the current value (ago=None) or a delayed LineBuffer
that fetches the value which is "ago" periods before. Useful to have
the closing price 5 bars before: close(-5)
"""
if ago is None:
return self[0]
return LineDelay(self, ago)
def _makeoperation(self, other, operation, r=False, _ownerskip=None, original_other=None):
# Only set parent_a/parent_b to LineActions instances (LinesOperation, _LineDelay, etc.).
# Full indicators (ATR, SMA, SuperTrend, etc.) are processed separately by _lineiterators
# ordering in _once(), so they must never be called via _parent_a.once() which would
# trigger premature once_via_next() calls that corrupt the data feed index state.
parent_a = None
if hasattr(self, "_owner") and self._owner is not None:
owner = self._owner
if hasattr(owner, "_owner_ref") and owner._owner_ref is not None:
ref = owner._owner_ref
if isinstance(ref, LineActions):
parent_a = ref
elif isinstance(owner, LineActions):
parent_a = owner
parent_b_candidate = original_other if original_other is not None else other
parent_b = parent_b_candidate if isinstance(parent_b_candidate, LineActions) else None
return LinesOperation(self, other, operation, r=r, parent_a=parent_a, parent_b=parent_b)
def _makeoperationown(self, operation, _ownerskip=None):
parent_a = None
if hasattr(self, "_owner") and self._owner is not None:
owner = self._owner
if hasattr(owner, "_owner_ref") and owner._owner_ref is not None:
ref = owner._owner_ref
if isinstance(ref, LineActions):
parent_a = ref
elif isinstance(owner, LineActions):
parent_a = owner
return LineOwnOperation(self, operation, parent_a=parent_a)
def _settz(self, tz):
self._tz = tz
[docs]
def datetime(self, ago=0, tz=None, naive=True):
"""Get the datetime value at the specified offset.
Args:
ago: Number of periods to look back (0=current, -1=previous).
tz: Timezone to apply. If None, uses self._tz.
naive: If True, return naive datetime without timezone info.
Returns:
datetime: Datetime object representing the timestamp.
Raises:
IndexError: If the requested position is out of bounds for data feeds.
"""
# PERFORMANCE OPTIMIZATION: Simplified datetime() method
# - Use module-level _DEFAULT_DATETIME constant
# - Remove redundant import statements
# - Reduce nested try-except blocks
# Get value, may raise IndexError for data feeds
value = self[ago]
# Fast path: Check for NaN/None values
if _is_nan_or_none(value):
return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz)
# Fast path: Common case (ago=0, tz=None, naive=True) with caching
if ago == 0 and tz is None and naive:
current_idx = self._idx
if self.lencount > 0 and current_idx >= self.lencount:
current_idx = self.lencount - 1
# Check cache
if (
getattr(self, "_dt_cache_idx", None) == current_idx
and getattr(self, "_dt_cache_tz", None) is self._tz
and getattr(self, "_dt_cache_value", None) == value
):
return self._dt_cache_dt
# Convert and cache
try:
dt = num2date(value, self._tz, True)
self._dt_cache_idx = current_idx
self._dt_cache_tz = self._tz
self._dt_cache_value = value
self._dt_cache_dt = dt
return dt
except (ValueError, OverflowError):
return _DEFAULT_DATETIME
# Slow path: non-default parameters
try:
return num2date(value, tz or self._tz, naive)
except (ValueError, OverflowError):
return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz)
[docs]
def date(self, ago=0, tz=None, naive=True):
"""Get the date component of the datetime value at the specified offset.
Args:
ago: Number of periods to look back (0=current, -1=previous).
tz: Timezone to apply. If None, uses self._tz.
naive: If True, return naive date without timezone info.
Returns:
date: Date object representing the date portion of the timestamp.
Raises:
IndexError: If the requested position is out of bounds for data feeds.
"""
# CRITICAL FIX: date() calls datetime(), which should raise IndexError if out of range
# This allows strategy to detect end of data for next_month calculation
try:
dt = self.datetime(ago, tz, naive)
except IndexError:
# Re-raise IndexError to allow strategy to detect end of data
raise
if dt is None:
return None
try:
return dt.date()
except (AttributeError, ValueError):
return None
[docs]
def time(self, ago=0, tz=None, naive=True):
"""Get the time component of the datetime value at the specified offset.
Args:
ago: Number of periods to look back (0=current, -1=previous).
tz: Timezone to apply. If None, uses self._tz.
naive: If True, return naive time without timezone info.
Returns:
time: Time object representing the time portion of the timestamp.
"""
dt = self.datetime(ago, tz, naive)
if dt is None:
return None
try:
return dt.time()
except (AttributeError, ValueError):
return None
[docs]
def dt(self, ago=0):
"""Alias to avoid the extra chars in "datetime" for this field"""
return self.datetime(ago)
[docs]
def tm_raw(self, ago=0):
"""
Returns a localtime/gmtime like time.struct_time object which is
compatible with strftime formatting.
The time zone of the struct_time is naive
"""
return self.datetime(ago, naive=False).timetuple()
[docs]
def tm(self, ago=0):
"""
Returns a localtime/gmtime like time.struct_time object which is
compatible with strftime formatting.
The time zone of the struct_time is naive
"""
return self.datetime(ago, naive=True).timetuple()
[docs]
def tm_lt(self, other, ago=0):
"""
Returns True if the time carried by this line's index "ago" is
lower than the time carried by the "other" line
"""
return self[ago] < other[0]
[docs]
def tm_le(self, other, ago=0):
"""
Returns True if the time carried by this line's index "ago" is
lower than or equal to the time carried by the "other" line
"""
return self[ago] <= other[0]
[docs]
def tm_eq(self, other, ago=0):
"""
Returns True if the time carried by this line's index "ago" is
equal to the time carried by the "other" line
"""
return self[ago] == other[0]
[docs]
def tm_gt(self, other, ago=0):
"""
Returns True if the time carried by this line's index "ago" is
greater than the time carried by the "other" line
"""
return self[ago] > other[0]
[docs]
def tm_ge(self, other, ago=0):
"""
Returns True if the time carried by this line's index "ago" is
greater than or equal to the time carried by the "other" line
"""
return self[ago] >= other[0]
[docs]
def tm2dtime(self, tm, ago=0):
"""
Returns the passed tm (time.struct_time) in a datetime using the
timezone (if any) of the line
"""
return datetime.datetime(*tm[:6])
[docs]
def tm2datetime(self, tm, ago=0):
"""
Returns the passed tm (time.struct_time) in a datetime using the
timezone (if any) of the line
"""
return datetime.datetime(*tm[:6])
# LineActions cache for performance
[docs]
class LineActionsCache:
"""Cache system for LineActions to avoid repetitive calculations"""
_cache: dict = {}
_cache_enabled = False
[docs]
@classmethod
def enable_cache(cls, enable=True):
"""Enable or disable the cache.
Args:
enable: True to enable caching, False to disable.
"""
cls._cache_enabled = enable
[docs]
@classmethod
def clear_cache(cls):
"""Clear all cached values."""
cls._cache.clear()
[docs]
@classmethod
def get_cache_key(cls, *args):
"""Generate cache key from arguments"""
return hash(tuple(id(arg) if hasattr(arg, "__hash__") else str(arg) for arg in args))
[docs]
class LineActionsMixin:
"""Mixin to provide LineActions functionality without metaclass"""
[docs]
@classmethod
def dopreinit(cls, _obj, *args, **kwargs):
"""Pre-initialization processing for LineActions"""
# CRITICAL FIX: Set lines._owner BEFORE any user __init__ code runs
# This is needed for line bindings like: self.lines.crossover = upcross - downcross
if hasattr(_obj, "lines") and _obj.lines is not None:
if not hasattr(_obj.lines, "_owner") or _obj.lines._owner is None:
_obj.lines._owner = _obj
# Set up clock from explicit line arguments first, matching the
# original LineActions metaclass semantics. This is critical for
# operations built on secondary data feeds: the operation must follow
# the line it was created from, not the strategy's primary data clock.
_obj._clock = None
_obj._datas = [arg for arg in args if isinstance(arg, LineRoot)]
if _obj._datas:
data_clock = getattr(_obj._datas[0], "_clock", None)
if data_clock is not None and data_clock.__class__.__name__ != "MinimalClock":
_obj._clock = data_clock
else:
_obj._clock = _obj._datas[0]
if _obj._clock is None and hasattr(_obj, "_owner") and _obj._owner is not None:
# Try to get clock from owner first
if hasattr(_obj._owner, "_clock") and _obj._owner._clock is not None:
_obj._clock = _obj._owner._clock
# If owner has datas, use the first data as clock
elif hasattr(_obj._owner, "datas") and _obj._owner.datas:
_obj._clock = _obj._owner.datas[0]
# If owner has data attribute, use it as clock
elif hasattr(_obj._owner, "data") and _obj._owner.data is not None:
_obj._clock = _obj._owner.data
# Try the owner itself as clock if it has __len__
elif hasattr(_obj._owner, "__len__"):
_obj._clock = _obj._owner
# If still no clock found and we have datas, use the first data
if _obj._clock is None and hasattr(_obj, "datas") and _obj.datas:
_obj._clock = _obj.datas[0]
# CRITICAL FIX: Only initialize minperiod to 1 if not already set from data sources
# The _minperiod might have been set in __new__ from data sources for nested indicators
# (e.g., EMA applied to another indicator's output)
if not hasattr(_obj, "_minperiod") or _obj._minperiod is None:
_obj._minperiod = 1
# CRITICAL FIX: Calculate minperiod from args (like original metaclass did)
# This ensures that indicators applied to other indicators inherit their minperiod
from .lineroot import LineMultiple, LineSingle
_minperiods = []
# Collect minperiods from LineSingle args
for arg in args:
if isinstance(arg, LineSingle):
_minperiods.append(getattr(arg, "_minperiod", 1))
# Collect minperiods from LineMultiple args (get their first line)
for arg in args:
if isinstance(arg, LineMultiple) and hasattr(arg, "lines") and arg.lines:
try:
first_line = arg.lines[0]
_minperiods.append(getattr(first_line, "_minperiod", 1))
except (IndexError, TypeError):
# Empty/non-indexable lines container; skip this arg.
pass
# Update minperiod with max from args
if _minperiods:
_minperiod = max(_minperiods)
_obj.updateminperiod(_minperiod)
return _obj, args, kwargs
[docs]
@classmethod
def dopostinit(cls, _obj, *args, **kwargs):
"""Post-initialization processing for LineActions"""
# NOTE: Indicator registration is now handled in lineiterator.py dopostinit
# with proper duplicate checking. No registration needed here.
[docs]
class PseudoArray:
"""Wrapper for non-array iterables to provide array-like access.
This class wraps iterables (including itertools.repeat) and provides
array-like indexing access. It handles cases where the wrapped object
doesn't support direct indexing.
Attributes:
wrapped: The wrapped iterable object.
_minperiod: Minimum period inherited from the wrapped object.
Example:
>>> from itertools import repeat
>>> pseudo = PseudoArray(repeat(1.0))
>>> print(pseudo[0])
1.0
"""
[docs]
def __init__(self, wrapped):
"""Initialize PseudoArray with a wrapped iterable.
Args:
wrapped: The iterable object to wrap.
"""
self.wrapped = wrapped
# CRITICAL FIX: Ensure PseudoArray has _minperiod attribute
self._minperiod = getattr(wrapped, "_minperiod", 1)
def __getitem__(self, key):
try:
# Try normal indexing first
return self.wrapped[key]
except (TypeError, IndexError, AttributeError):
# Handle itertools.repeat objects and other iterables that don't support indexing
if hasattr(self.wrapped, "__iter__"):
# For repeat objects, all values are the same, so just get the first one
try:
# Convert to list if it's a repeat object
if str(type(self.wrapped)) == "<class 'itertools.repeat'>":
# For repeat, all values are the same
return next(iter(self.wrapped))
# Convert iterable to list and index
wrapped_list = list(self.wrapped)
return wrapped_list[key]
except (StopIteration, IndexError):
return float("nan")
else:
# If not iterable, return the wrapped object itself for index 0
if key == 0:
return self.wrapped
return float("nan")
@property
def array(self):
"""Get the array representation of the wrapped object.
Returns:
list or array: Array representation of the wrapped object.
"""
# Handle repeat objects specially
if str(type(self.wrapped)) == "<class 'itertools.repeat'>":
# For repeat objects, return a list with one element repeated
return [next(iter(self.wrapped))]
if hasattr(self.wrapped, "array"):
return self.wrapped.array
if not hasattr(self.wrapped, "__iter__"):
return []
return self.wrapped
[docs]
class LineActions(LineBuffer, LineActionsMixin, metabase.ParamsMixin):
"""
Base class for *Line Clases* with different lines, derived from a
LineBuffer
"""
_ltype = LineRoot.IndType
# Add plotlines attribute for plotting support
plotlines = object()
[docs]
def __new__(cls, *args, **kwargs):
"""Handle data processing for indicators and other LineActions objects"""
# Create the instance using the normal Python object creation
instance = super().__new__(cls)
# Initialize basic attributes
import collections
instance._lineiterators = collections.defaultdict(list)
instance._lineaction_init_args = args
# CRITICAL FIX: Define mindatas before using it
mindatas = getattr(cls, "_mindatas", getattr(cls, "mindatas", 1))
# Set up parameters for this instance (needed for self.p.period etc.)
if hasattr(cls, "_params") and cls._params is not None:
params_cls = cls._params
# Create parameter instance for this object
instance.p = params_cls()
# Update with kwargs
for key, value in kwargs.items():
if hasattr(instance.p, key):
setattr(instance.p, key, value)
else:
# Fallback to empty parameter object
from .utils import DotDict
instance.p = DotDict(**kwargs)
# Create and set up Lines instance
lines_cls = getattr(cls, "lines", None)
if lines_cls is not None:
instance.lines = lines_cls()
# CRITICAL FIX: Set lines._owner immediately after creating lines instance
# Use object.__setattr__ to directly set _owner_ref (bypasses Lines.__setattr__)
object.__setattr__(instance.lines, "_owner_ref", instance)
# Ensure lines are properly initialized with their own buffers
if hasattr(instance.lines, "_obj"):
instance.lines._obj = instance
# CRITICAL FIX: Ensure lines instance has the essential methods
# If the lines instance doesn't have advance method, add it
if not hasattr(instance.lines, "advance"):
def advance_method(size=1):
"""Forward all lines in the collection"""
for line in getattr(instance.lines, "lines", []):
if hasattr(line, "advance"):
line.advance(size=size)
instance.lines.advance = advance_method
# CRITICAL FIX: Set up line references for indicators
# Each line should be a separate LineBuffer with its own array
if hasattr(instance.lines, "lines") and instance.lines.lines:
# Ensure each line is a LineBuffer with its own array
for i, line_obj in enumerate(instance.lines.lines):
if not isinstance(line_obj, LineBuffer):
# Create a new LineBuffer for this line - no import needed, we're in linebuffer.py
new_line = LineBuffer()
# Copy any existing attributes
if hasattr(line_obj, "__dict__"):
new_line.__dict__.update(line_obj.__dict__)
instance.lines.lines[i] = new_line
line_obj = new_line
# Ensure the line has its own array
if not hasattr(line_obj, "array") or not line_obj.array:
import array
line_obj.array = array.array("d")
line_obj._idx = -1
line_obj.lencount = 0
line_obj._refresh_cached_line_flags(
owner=instance.lines,
ltype=getattr(instance, "_ltype", cls._ltype),
)
# Set up convenience references - first line as .line
instance.line = instance.lines.lines[0] if instance.lines.lines else instance
instance.l = instance.lines # Common shorthand
else:
# No individual lines, use the instance itself
instance.line = instance
instance.l = instance.lines
else:
# Create default lines using the proper Lines class
from .lineseries import Lines
instance.lines = Lines()
# Add the advance method if it doesn't exist
if not hasattr(instance.lines, "advance"):
def advance_method(size=1):
"""Forward all lines in the collection"""
for line in getattr(instance.lines, "lines", []):
if hasattr(line, "advance"):
line.advance(size=size)
instance.lines.advance = advance_method
instance.line = instance
instance.l = instance.lines
# CRITICAL FIX: Auto-assign data from owner if no data provided and mindatas > 0
if mindatas > 0:
# Try to get owner and auto-assign data using multiple strategies
from . import metabase
owner = None
# Strategy 1: Use nearest LineIterator owner. Falling back directly
# to Strategy can skip an enclosing indicator and bind expression
# clocks to the primary strategy data.
try:
from .lineiterator import LineIterator
except ImportError:
LineIterator = None
if LineIterator is not None:
owner = metabase.findowner(instance, LineIterator)
# Strategy 2: Use findowner for Strategy
try:
from .strategy import Strategy
except ImportError:
Strategy = None
if owner is None and Strategy is not None:
owner = metabase.findowner(instance, Strategy)
# If we found an owner with data, auto-assign it
if owner is not None and hasattr(owner, "data") and owner.data is not None:
# Check if we already have data in args
data_count = 0
for arg in args:
if (
isinstance(arg, LineRoot)
or hasattr(arg, "lines")
or hasattr(arg, "_name")
or str(type(arg).__name__).endswith("Data")
):
data_count += 1
# If we need more data sources than we have, auto-assign from owner
if data_count < mindatas:
# Add owner's data as needed
missing_data_count = mindatas - data_count
for _ in range(missing_data_count):
args = (owner.data,) + args
# Process arguments to identify data sources
data_count = 0
processed_datas = []
for i, arg in enumerate(args):
if (
isinstance(arg, LineRoot)
or hasattr(arg, "lines")
or hasattr(arg, "_name")
or str(type(arg).__name__).endswith("Data")
):
processed_datas.append(arg)
data_count += 1
if data_count >= mindatas:
break
instance.datas = processed_datas
if processed_datas:
instance.data = processed_datas[0]
else:
instance.data = None
# Set up dnames if available
try:
from .utils import DotDict
instance.dnames = DotDict(
[(d._name, d) for d in instance.datas if getattr(d, "_name", "")]
)
except Exception:
instance.dnames = {}
return instance
def __init__(self, *args, **kwargs):
"""Initialize LineActions instance.
Sets up lines, owner references, data sources, and clock.
This is a complex initialization that handles multiple scenarios
including indicators, strategies, and data feeds.
Args:
*args: Positional arguments including data feeds.
**kwargs: Keyword arguments for parameters.
"""
# CRITICAL FIX: Set lines._owner FIRST, before any other initialization
# This ensures line bindings in user's __init__ can find the owner
if hasattr(self, "lines") and self.lines is not None:
# If lines is still a class, create an instance first
if isinstance(self.lines, type):
self.lines = self.lines()
# Now set owner using object.__setattr__ to directly set _owner_ref
if self.lines is not None:
object.__setattr__(self.lines, "_owner_ref", self)
# Set up _owner from call stack BEFORE calling dopreinit
from . import metabase
# Try to find any LineIterator-like owner
# Try findowner first with different classes
self._owner = None
# First try to find the nearest LineIterator. For LineActions created
# inside an indicator this keeps ownership/clock fallback local to that
# indicator instead of jumping to the enclosing strategy.
try:
from .lineiterator import LineIterator
self._owner = metabase.findowner(self, LineIterator)
except Exception as e:
logger.debug("Failed to find LineIterator owner: %s", e)
# If no LineIterator found, try Strategy specifically
try:
from .strategy import Strategy
if self._owner is None:
self._owner = metabase.findowner(self, Strategy)
except Exception as e:
logger.debug("Failed to find Strategy owner: %s", e)
# If still no owner, try a broader search
# findowner() uses OwnerContext for owner lookup
if self._owner is None:
self._owner = metabase.findowner(self, None)
init_args = args or getattr(self, "_lineaction_init_args", ())
# Call pre-init
self.__class__.dopreinit(self, *init_args, **kwargs)
# Call parent init
super().__init__()
# LineBuffer.__init__ initializes low-level buffer fields and resets
# _clock/_owner metadata. Re-apply the LineActions pre-init metadata
# afterwards so explicit line operands keep their own data clock.
self.__class__.dopreinit(self, *init_args, **kwargs)
self._refresh_cached_line_flags(
owner=getattr(self, "_owner", None),
ltype=getattr(self.__class__, "_ltype", LineRoot.IndType),
)
if hasattr(self, "lines") and hasattr(self.lines, "lines"):
for line_obj in self.lines.lines:
if hasattr(line_obj, "_refresh_cached_line_flags"):
line_obj._refresh_cached_line_flags(
owner=self.lines,
ltype=getattr(self, "_ltype", LineRoot.IndType),
)
# Call post-init
self.__class__.dopostinit(self, *args, **kwargs)
[docs]
def getindicators(self):
"""Get list of indicators using this line actions object.
Returns:
list: Empty list for base LineActions (override in subclasses).
"""
return []
[docs]
def qbuffer(self, savemem=0):
"""Enable queued buffer mode for memory-efficient storage.
Args:
savemem: Memory saving mode (0=normal, >0=enable cache mode).
"""
super().qbuffer(savemem=1)
[docs]
def plotlabel(self):
"""Return the plot label for this line object"""
# Try to get plot label from _plotlabel method
if hasattr(self, "_plotlabel"):
label_dict = self._plotlabel()
# Convert dict to string format
if isinstance(label_dict, dict):
# Format as 'ClassName(param1=value1, param2=value2)'
params_str = ", ".join(f"{k}={v}" for k, v in label_dict.items())
if params_str:
return f"{self.__class__.__name__}({params_str})"
return self.__class__.__name__
return str(label_dict)
# Fallback: return class name
return self.__class__.__name__
def _plotlabel(self):
"""Default implementation of plot label"""
# Try to get params if available
if hasattr(self, "params") and hasattr(self.params, "_getkwargs"):
return self.params._getkwargs()
# Otherwise return empty dict
return {}
[docs]
@staticmethod
def arrayize(obj):
"""Convert an object to an array-compatible object.
Args:
obj: Object to convert. Can be a value, iterable, or array-like.
Returns:
The original object if it has an array attribute,
otherwise a LineNum or PseudoArray wrapper.
"""
if not hasattr(obj, "array"):
if not hasattr(obj, "__getitem__"):
# CRITICAL FIX: Create a LineNum that properly handles _minperiod
line_num = LineNum(obj)
# Ensure the LineNum has the _minperiod attribute
if not hasattr(line_num, "_minperiod"):
line_num._minperiod = 1
return line_num # make it a LineNum
if not hasattr(obj, "__len__"):
pseudo_array = PseudoArray(obj)
# CRITICAL FIX: Ensure PseudoArray objects have _minperiod for compatibility
if not hasattr(pseudo_array, "_minperiod"):
pseudo_array._minperiod = 1
return pseudo_array # Can iterate (for once)
return obj
def _next_old(self):
"""DEPRECATED: This method is no longer used. LineIterator._next() is used instead."""
# CRITICAL FIX: Prevent double processing if _once was already called
if hasattr(self, "_once_called") and self._once_called:
return # Already processed in once mode, don't process again
# CRITICAL FIX: Ensure data synchronization without over-advancing
if hasattr(self, "_clock") and self._clock is not None:
try:
clock_len = len(self._clock)
self_len = len(self)
# Only advance if we're behind the clock and not already at or ahead
if self_len < clock_len and (clock_len - self_len) <= 1:
# Forward one step to match the clock
self.forward()
except Exception:
# If clock access fails, just forward once
self.forward()
else:
# No clock, just forward once
self.forward()
# Call prenext or nextstart/next depending on minperiod
if len(self) < self._minperiod:
self.prenext()
elif len(self) == self._minperiod:
self.nextstart() # called once for the 1st value over minperiod
else:
self.next() # called for each value over minperiod
def _once(self, start, end):
# Mark that once was called to prevent double processing in _next
self._once_called = True
# CRITICAL FIX: Ensure array exists but don't pre-fill it
# Pre-filling causes incorrect buflen() calculations
if not hasattr(self, "array") or self.array is None:
import array as array_module
self.array = array_module.array("d")
# CRITICAL FIX: Ensure proper range for once processing
if start < 0:
start = 0
if end < start:
end = start
# CRITICAL FIX: Get the actual buffer length if available
# Skip this check if _clock is MinimalClock (always returns 0)
if hasattr(self, "_clock") and self._clock and hasattr(self._clock, "buflen"):
clock_class_name = getattr(self._clock, "__class__", type(None)).__name__
if "MinimalClock" not in clock_class_name:
max_len = self._clock.buflen()
if max_len > 0 and end > max_len:
end = max_len
# CRITICAL FIX: Call _once() on all child line iterators first
# This ensures dependencies are calculated before this indicator
if hasattr(self, "_lineiterators"):
from .lineiterator import LineIterator
for indicator in self._lineiterators.get(LineIterator.IndType, []):
try:
if hasattr(indicator, "_once"):
indicator._once(start, end)
except Exception as e:
logger.debug("Indicator _once failed: %s", e)
# CRITICAL FIX: Call preonce before main processing
try:
if hasattr(self, "preonce"):
self.preonce(start, end)
except Exception as e:
logger.debug("preonce failed: %s", e)
# CRITICAL FIX: Ensure operand arrays are computed before once()
# For Logic subclasses (bt.If, bt.And, etc.), operands (args, cond) need
# their arrays populated before once() reads from them.
if hasattr(self, "args"):
for arg in self.args:
if hasattr(arg, "once") and hasattr(arg, "array") and len(arg.array) < end:
try:
arg.once(0, end)
except Exception: # nosec B110
# Operand may already be computed or not once()-able; continue.
pass
if hasattr(self, "cond"):
cond = self.cond
if hasattr(cond, "once") and hasattr(cond, "array") and len(cond.array) < end:
try:
cond.once(0, end)
except Exception: # nosec B110
# Condition may already be computed or not once()-able; continue.
pass
# CRITICAL FIX: Process the main once calculation
# Try to call once method if it exists
try:
if hasattr(self, "once") and callable(self.once):
self.once(start, end)
except Exception as e:
# If once fails or doesn't exist, skip it
# The indicator will be calculated via next() calls during strategy execution
logger.debug("once() failed: %s", e)
# CRITICAL FIX: Update lencount after once processing to match the data length
# In runonce mode, lencount should equal the number of data points processed
# Get the actual data length from the clock or data source
actual_data_len = end
try:
# Try to get the actual data length from clock or data sources
if hasattr(self, "_clock") and self._clock:
try:
actual_data_len = self._clock.buflen()
except Exception as e:
logger.debug("clock.buflen() failed: %s", e)
try:
actual_data_len = len(self._clock)
except Exception as e2:
logger.debug("len(clock) failed: %s", e2)
elif hasattr(self, "datas") and self.datas and len(self.datas) > 0:
try:
actual_data_len = self.datas[0].buflen()
except Exception as e:
logger.debug("datas[0].buflen() failed: %s", e)
try:
actual_data_len = len(self.datas[0])
except Exception as e2:
logger.debug("len(datas[0]) failed: %s", e2)
# Use the maximum of end and actual_data_len to ensure we don't truncate
final_len = max(end, actual_data_len) if actual_data_len > 0 else end
except Exception as e:
logger.debug("Failed to determine actual data length: %s", e)
final_len = end
if hasattr(self, "lines") and hasattr(self.lines, "lines") and self.lines.lines:
# Update lencount for all lines to match the data length
for line in self.lines.lines:
if hasattr(line, "lencount"):
# CRITICAL FIX: Set lencount to final_len (actual data length)
# This ensures len(indicator) == len(strategy) in runonce mode
line.lencount = final_len
if hasattr(line, "_idx"):
# Set _idx to the last processed position
line._idx = final_len - 1 if final_len > 0 else -1
# CRITICAL FIX: Call oncebinding to propagate computed values to bound lines
# This is needed for bt.If, Logic subclasses etc. that compute values into
# their own array and need to copy them to the bound output line.
self.oncebinding()
[docs]
@classmethod
def cleancache(cls):
"""Clean the cache - called by cerebro"""
LineActionsCache.clear_cache()
[docs]
@classmethod
def usecache(cls, enable=True):
"""Enable or disable the cache"""
LineActionsCache.enable_cache(enable)
[docs]
def LineDelay(a, ago=0, **kwargs):
"""Create a delayed line object.
Args:
a: Source line object.
ago: Number of periods to delay. Negative for lookback.
**kwargs: Additional keyword arguments.
Returns:
_LineDelay or _LineForward: A delayed line object.
"""
if ago <= 0:
return _LineDelay(a, ago, **kwargs)
return _LineForward(a, ago)
[docs]
def LineNum(num):
"""Create a constant line from a number.
Args:
num: The constant value.
Returns:
_LineDelay: A line object that always returns the constant value.
"""
return _LineDelay(PseudoArray(repeat(num)), 0)
class _LineDelay(LineActions):
"""Delayed line object for negative ago values (lookback).
This class represents a line that accesses historical values
from another line. For example, data(-1) returns the
previous bar's value.
Attributes:
a: The source line object.
ago: Number of periods to look back (negative value).
"""
def __init__(self, a, ago):
"""Initialize the delayed line.
Args:
a: Source line object.
ago: Number of periods to look back (negative value).
"""
super().__init__()
self.a = self.arrayize(a)
self.ago = ago
# CRITICAL FIX: Inherit minperiod from source's owner (indicator) if available
# When called as nzd(-1), 'a' is nzd.lines[0] which has minperiod=1,
# but the indicator nzd has minperiod=20. We need to use the indicator's minperiod.
source_minperiod = getattr(a, "_minperiod", 1)
# Check if source has an owner with a higher minperiod
if hasattr(a, "_owner") and a._owner is not None:
owner = a._owner
# Check for _owner_ref (Lines object pointing to indicator)
if hasattr(owner, "_owner_ref") and owner._owner_ref is not None:
owner_minperiod = getattr(owner._owner_ref, "_minperiod", 1)
source_minperiod = max(source_minperiod, owner_minperiod)
else:
owner_minperiod = getattr(owner, "_minperiod", 1)
source_minperiod = max(source_minperiod, owner_minperiod)
# Update our minperiod with the source's minperiod
if source_minperiod > 1:
self.updateminperiod(source_minperiod)
# Need to add the delay to the period. "ago" is 0 based and therefore
# we need to pass an extra 1 which is the minimum defined period for
# any data (which will be subtracted inside addminperiod)
# CRITICAL FIX: Must add abs(ago) + 1, NOT just abs(ago)
self.addminperiod(abs(ago) + 1)
def __getitem__(self, idx):
"""CRITICAL FIX: Override __getitem__ to compute delayed value dynamically.
This handles constants wrapped in PseudoArray correctly.
For ago=-10 (lookback), accessing [0] should return self.a[-10] (10 bars back).
Formula: self.a[idx + ago] where ago is negative for lookback.
"""
try:
# For delay operations, get value from source with delay applied
# ago is negative for lookback, so idx + ago gives historical index
value = self.a[idx + self.ago]
if value is None:
return 0.0
if isinstance(value, float) and not math.isfinite(value):
return 0.0
return value
except (IndexError, TypeError):
return 0.0
def next(self):
"""Calculate and set the delayed value for the current bar.
Gets the value from the source line at the delayed position
and stores it at position 0.
"""
# CRITICAL FIX: Proper delay operation
# ago is negative for lookback (e.g., ago=-10 means 10 bars back)
# We need self.a[ago] to get the historical value
try:
# Get the delayed value - ago is already negative for lookback
delayed_val = self.a[self.ago]
# Ensure value is never None or NaN
if (
delayed_val is None
or isinstance(delayed_val, float)
and not math.isfinite(delayed_val)
):
delayed_val = 0.0
self[0] = delayed_val
except (IndexError, AttributeError):
# If we can't get the delayed value, use 0.0
self[0] = 0.0
def once(self, start, end):
"""Calculate delayed values in batch mode (runonce).
Args:
start: Starting index.
end: Ending index.
"""
# cache python dictionary lookups
dst = self.array
ago = self.ago
# Ensure destination array is properly sized. Missing delayed values must
# remain NaN; using 0.0 would turn unavailable bars into real signals.
while len(dst) < end:
dst.append(float("nan"))
# CRITICAL FIX: Ensure source has computed its values before we access them
# This is necessary for LinesOperation sources that haven't run once() yet
if hasattr(self.a, "once") and hasattr(self.a, "array") and len(self.a.array) < end:
self.a.once(start, end)
# CRITICAL FIX: Check if source is a constant value (PseudoArray with repeat)
# We need to check the wrapped object, not just the array, because
# PseudoArray.array returns a new list each time
is_constant = False
constant_value = None
# Check if self.a is a PseudoArray wrapping a repeat object
# OR if self.a is a _LineDelay that wraps a PseudoArray with repeat
source_obj = self.a
if hasattr(self.a, "a"):
# self.a is a _LineDelay, check its source
source_obj = self.a.a
if hasattr(source_obj, "wrapped"):
wrapped = source_obj.wrapped
# Check if it's a repeat object
if (
isinstance(wrapped, itertools.repeat)
or str(type(wrapped)) == "<class 'itertools.repeat'>"
):
is_constant = True
try:
# Get the constant value from the repeat object
# Create a new iterator to avoid consuming it
constant_value = next(iter(wrapped))
if constant_value is None:
constant_value = float("nan")
except (StopIteration, TypeError):
constant_value = float("nan")
# If not a constant, get the source array
if not is_constant:
src = self.a.array
for i in range(start, end):
if is_constant:
# For constant values, just use the constant
dst[i] = constant_value
else:
# CRITICAL FIX: Proper bounds checking for delayed access
# For ago=-26 (forward shift), we need to access src[i + ago] = src[i - 26]
# to get the value calculated 26 bars ago
src_index = i + ago
if src_index >= 0 and src_index < len(src):
val = src[src_index]
if val is None:
val = float("nan")
dst[i] = val
else:
# Out-of-range historical/future access is unavailable, not 0.
dst[i] = float("nan")
class _LineForward(LineActions):
"""Forward line object for positive ago values (lookahead).
This class represents a line that accesses future values
from another line. For example, data(1) returns the
next bar's value.
Attributes:
a: The source line object.
ago: Number of periods to look ahead (positive value).
"""
def __init__(self, a, ago):
"""Initialize the forward line.
Args:
a: Source line object.
ago: Number of periods to look ahead (positive value).
"""
super().__init__()
self.a = self.arrayize(a)
self.ago = ago
# Need to add the delay to the period
if hasattr(a, "_minperiod"):
self.addminperiod(ago)
def next(self):
"""Calculate and set the forwarded value for the current bar.
Gets the value from the source line at the forward position
and stores it at position 0.
"""
# operation(float, other) ... expecting other to be a float
# CRITICAL FIX: Ensure we get valid numeric values for indicator calculations
try:
# Get operand values with proper type checking
if hasattr(self.a, "__getitem__"):
# LineBuffer-like object - get current value
try:
a_val = self.a[0]
except (IndexError, TypeError):
a_val = float("nan")
else:
# Direct value
a_val = self.a
if hasattr(self.b, "__getitem__"):
# LineBuffer-like object - get current value
try:
b_val = self.b[0]
except (IndexError, TypeError):
b_val = float("nan")
else:
# Direct value
b_val = self.b
# Preserve indicator warmup semantics. NaN/None operands must
# remain NaN instead of becoming valid trading signals.
if a_val is None or (isinstance(a_val, float) and a_val != a_val):
self[0] = float("nan")
return
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
elif not isinstance(a_val, (int, float)):
try:
a_val = float(a_val)
except (ValueError, TypeError):
self[0] = float("nan")
return
if b_val is None or (isinstance(b_val, float) and b_val != b_val):
self[0] = float("nan")
return
if isinstance(b_val, float) and not math.isfinite(b_val):
b_val = 0.0
elif not isinstance(b_val, (int, float)):
try:
b_val = float(b_val)
except (ValueError, TypeError):
self[0] = float("nan")
return
# CRITICAL FIX: Actually perform the operation and store the result
# Handle both normal and reverse operations
if hasattr(self, "operation") and self.operation:
# CRITICAL FIX: Handle reverse operations properly
if getattr(self, "r", False):
result = self.operation(b_val, a_val) # Reverse: b op a
else:
result = self.operation(a_val, b_val) # Normal: a op b
# Ensure result is a valid number
if result is None:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
if result != result:
result = float("nan")
else:
result = 0.0
elif not isinstance(result, (int, float)):
try:
result = float(result)
except (ValueError, TypeError):
result = float("nan")
# Store the result in the current position
self[0] = result
else:
# Fallback: store a_val if no operation is defined
self[0] = a_val
except Exception:
logger.debug("LineOwnOperation.next fallback triggered", exc_info=True)
# If anything fails, store 0.0 to prevent crashes
self[0] = 0.0
def once(self, start, end):
"""Calculate forwarded values in batch mode (runonce).
Args:
start: Starting index.
end: Ending index.
"""
# cache python dictionary lookups
dst = self.array
srca = self.a.array
op = self.operation
# CRITICAL FIX: Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
# CRITICAL FIX: Ensure source array has required data
if len(srca) < end:
# If source array is shorter than required range, only process available data
end = min(end, len(srca))
# Fast path: process the whole range under a single try. The per-element
# try/except below is only entered if something raises, preserving the
# exact per-element 0.0 fallback semantics while avoiding per-element
# exception-handler setup in the common (no-error) case (R2-S4: PERF203).
try:
for i in range(start, end):
a_val = srca[i] if i < len(srca) else 0.0
if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)):
a_val = 0.0
result = op(a_val)
if result is None or (isinstance(result, float) and not math.isfinite(result)):
result = 0.0
dst[i] = result
return
except Exception:
logger.debug(
"LineOwnOperation.once fast path failed; per-element fallback", exc_info=True
)
for i in range(start, end):
try:
# CRITICAL FIX: Bounds checking for source array
a_val = srca[i] if i < len(srca) else 0.0
# Ensure value is numeric
if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)):
a_val = 0.0
result = op(a_val)
# Ensure result is valid
if result is None or (isinstance(result, float) and not math.isfinite(result)):
result = 0.0
dst[i] = result
except Exception:
logger.debug(
"LineOwnOperation.once fallback triggered at index %d", i, exc_info=True
)
# If operation fails, store 0.0
dst[i] = 0.0
[docs]
class LinesOperation(LineActions):
"""Operation between two line objects (binary operations).
This class represents binary operations (addition, subtraction, etc.)
between two line objects. The result is a new line that contains the
element-wise operation result.
Attributes:
operation: The binary function to apply (e.g., operator.add).
a: First operand (left-hand side).
b: Second operand (right-hand side).
r: If True, reverse operation order.
_parent_a: Parent indicator for operand a.
_parent_b: Parent indicator for operand b.
Example:
>>> result = LinesOperation(indicator1, indicator2, operator.sub)
>>> # result[0] = indicator1[0] - indicator2[0]
"""
def __init__(self, a, b, operation, r=False, parent_a=None, parent_b=None):
"""Initialize a binary operation between two line objects.
Args:
a: First operand (left-hand side).
b: Second operand (right-hand side).
operation: The binary function to apply (e.g., operator.add).
r: If True, reverse operation order (b op a instead of a op b).
parent_a: Parent indicator for operand a.
parent_b: Parent indicator for operand b.
"""
super().__init__()
self.operation = operation
self.a = a # always a linebuffer-like object
self.b = self.arrayize(b)
self.r = r
self._datas = [operand for operand in (self.a, self.b) if isinstance(operand, LineRoot)]
if self._datas:
data_clock = getattr(self._datas[0], "_clock", None)
if data_clock is not None and data_clock.__class__.__name__ != "MinimalClock":
self._clock = data_clock
else:
self._clock = self._datas[0]
# CRITICAL FIX: Store references to parent indicators for _once processing
# Use passed parent references if available, otherwise try to find them
self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a)
self._parent_b = parent_b if parent_b is not None else self._find_parent_indicator(b)
# ensure a is added if it's a lineiterator-like object
# self.addminperiod(1) already done by the base class
# CRITICAL FIX: Handle _minperiod attribute access more safely
a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1
b_minperiod = getattr(b, "_minperiod", 1) if hasattr(b, "_minperiod") else 1
# Use updateminperiod to take max of operand minperiods
# For me1 - me2, minperiod = max(me1._minperiod, me2._minperiod)
max_minperiod = max(a_minperiod, b_minperiod)
self.updateminperiod(max_minperiod)
self._a_minperiod = a_minperiod
self._b_minperiod = b_minperiod
self._a_guard_minperiod = self._needs_minperiod_guard(self.a)
self._b_guard_minperiod = self._needs_minperiod_guard(self.b)
self._next_operands = tuple(
operand
for operand in (self.a, self.b)
if operand is not self
and isinstance(operand, LineActions)
and hasattr(operand, "_next")
)
@staticmethod
def _is_constant_operand(operand):
"""Return True for constants wrapped as LineDelay(PseudoArray)."""
return (
operand.__class__.__name__ == "_LineDelay"
and getattr(operand, "a", None).__class__.__name__ == "PseudoArray"
)
@staticmethod
def _is_line_delay_operand(operand):
return operand.__class__.__name__ == "_LineDelay"
@classmethod
def _needs_minperiod_guard(cls, operand):
return (
not cls._is_constant_operand(operand)
and not cls._is_line_delay_operand(operand)
and not isinstance(operand, LineActions)
and hasattr(operand, "__len__")
and hasattr(operand, "_minperiod")
)
@staticmethod
def _is_missing(value):
return value is None or (isinstance(value, float) and value != value)
def _operand_value(self, operand, ago=0, guard_minperiod=False, minperiod=1):
"""Read an operand while preserving indicator warmup NaN semantics."""
if guard_minperiod:
try:
target_len = len(operand) + ago
if target_len < minperiod:
return float("nan")
except Exception: # nosec B110
# Operand without a usable length; skip the warmup guard.
pass
if hasattr(operand, "__getitem__"):
return operand[ago]
return operand
def _normalize_operand(self, value):
if self._is_missing(value):
return float("nan")
if isinstance(value, float) and not math.isfinite(value):
return 0.0
if isinstance(value, (int, float)):
return value
try:
return float(value)
except (ValueError, TypeError):
return float("nan")
def _next_operand_if_due(self, operand):
clock = getattr(operand, "_clock", None)
if clock is not None:
try:
if len(clock) <= len(operand):
return
except Exception: # nosec B110
# Clock without a comparable length; advance the operand anyway.
pass
operand._next()
def _find_parent_indicator(self, operand):
"""Find the parent indicator that owns this operand.
Only returns LineActions objects. Full Indicator/LineIterator objects are
never returned because they are processed separately via the _lineiterators
ordering in _once(). Returning a full Indicator here would cause premature
once_via_next() calls with incorrect data state.
"""
# If operand is already a LineActions (arithmetic expression chain), return it
if isinstance(operand, LineActions):
return operand
# For plain LineBuffer: check if owner is a LineActions (not a full Indicator)
if hasattr(operand, "_owner") and operand._owner is not None:
owner = operand._owner
if hasattr(owner, "_owner_ref") and owner._owner_ref is not None:
ref = owner._owner_ref
if isinstance(ref, LineActions):
return ref
if isinstance(owner, LineActions):
return owner
return None
[docs]
def __getitem__(self, ago):
"""Get value at the specified offset.
In runonce mode, the array is pre-computed by once(), so use it directly.
Falls back to dynamic computation only if the array is not populated.
"""
try:
# Use pre-computed array if available (runonce mode)
current_idx = self._idx
if current_idx >= 0 and len(self.array) > 0:
target_idx = current_idx + ago
if 0 <= target_idx < len(self.array):
value = self.array[target_idx]
if value is not None:
if isinstance(value, float):
if value == value:
if not math.isfinite(value):
return 0.0
return value
else:
return value
# Fallback: compute value dynamically from source operands.
a_val = self._normalize_operand(
self._operand_value(self.a, ago, self._a_guard_minperiod, self._a_minperiod)
)
b_val = self._normalize_operand(
self._operand_value(self.b, ago, self._b_guard_minperiod, self._b_minperiod)
)
if self._is_missing(a_val):
return float("nan")
if self._is_missing(b_val):
return float("nan")
# Compute and return the operation result
if self.r:
result = self.operation(b_val, a_val)
else:
result = self.operation(a_val, b_val)
if self._is_missing(result):
return float("nan")
if isinstance(result, float) and not math.isfinite(result):
return 0.0
return result
except (IndexError, TypeError):
return float("nan")
def _next(self):
"""CRITICAL FIX: _next() method for compatibility with LineIterator processing loop.
This method is called by LineIterator._next() for items in _lineiterators[IndType].
"""
# Clock guard: skip if already advanced to the current clock position.
# This prevents double-advancing when a LinesOperation is both registered
# directly in _lineiterators AND driven via a parent's _next_operands chain.
clock = getattr(self, "_clock", None)
if clock is not None and clock.__class__.__name__ != "MinimalClock":
try:
if len(clock) <= len(self):
return
except Exception: # nosec B110
# Clock without a comparable length; proceed to advance operands.
pass
for operand in self._next_operands:
self._next_operand_if_due(operand)
# Advance the line buffer
self.advance()
# Call next() to compute the value
self.next()
# Update bindings so bound lines get the computed value
for binding in self.bindings:
binding[0] = self[0]
[docs]
def next(self):
"""Calculate and set the operation result for the current bar.
Performs the binary operation on the current values of both
operands and stores the result at position 0.
"""
# operation(float, other) ... expecting other to be a float
# CRITICAL FIX: Ensure we get valid numeric values for indicator calculations
try:
a_val = self._normalize_operand(
self._operand_value(self.a, 0, self._a_guard_minperiod, self._a_minperiod)
)
b_val = self._normalize_operand(
self._operand_value(self.b, 0, self._b_guard_minperiod, self._b_minperiod)
)
if self._is_missing(a_val) or self._is_missing(b_val):
self[0] = float("nan")
return
# CRITICAL FIX: Actually perform the operation and store the result
# Handle both normal and reverse operations
if hasattr(self, "operation") and self.operation:
# CRITICAL FIX: Handle reverse operations properly
if getattr(self, "r", False):
result = self.operation(b_val, a_val) # Reverse: b op a
else:
result = self.operation(a_val, b_val) # Normal: a op b
# Ensure result is a valid number
if result is None:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
if result != result:
result = float("nan")
else:
result = 0.0
elif not isinstance(result, (int, float)):
try:
result = float(result)
except (ValueError, TypeError):
result = float("nan")
# Store the result in the current position
self[0] = result
else:
# Fallback: store a_val if no operation is defined
self[0] = a_val
except Exception:
self[0] = float("nan")
[docs]
def once(self, start, end):
"""Calculate operation results in batch mode (runonce).
Args:
start: Starting index.
end: Ending index.
"""
# CRITICAL FIX: Always use start=0 for nested operations
# This ensures historical values are available for indicators like SMA
nested_start = 0
# CRITICAL FIX: Call parent indicators' once() methods to populate their arrays
# This is needed for cases like dif = ema_1 - ema_2 where ema_1/ema_2 must be computed first
if self._parent_a is not None and hasattr(self._parent_a, "once"):
try:
self._parent_a.once(nested_start, end)
except Exception as e:
logger.debug("parent_a.once() failed: %s", e)
if self._parent_b is not None and hasattr(self._parent_b, "once"):
try:
self._parent_b.once(nested_start, end)
except Exception as e:
logger.debug("parent_b.once() failed: %s", e)
# CRITICAL FIX: Call once() on operands that have it, but ONLY for LineActions
# instances (like _LineDelay, LinesOperation). Never call once() on full Indicators
# (ATR, SuperTrend, etc.) because those are managed by _lineiterators in _once().
# Calling once() on a full Indicator here would trigger premature once_via_next calls
# before the indicator's data state is properly set up.
if isinstance(self.a, LineActions) and hasattr(self.a, "once"):
try:
self.a.once(nested_start, end)
except Exception as e:
logger.debug("operand a.once() failed: %s", e)
if isinstance(self.b, LineActions) and hasattr(self.b, "once"):
try:
self.b.once(nested_start, end)
except Exception as e:
logger.debug("operand b.once() failed: %s", e)
# CRITICAL FIX: Always process from 0 to populate historical values
if hasattr(self.b, "array") and type(self.b).__name__ != "PseudoArray":
self._once_op(nested_start, end)
else:
if isinstance(self.b, float):
(
self._once_val_op_r(nested_start, end)
if self.r
else self._once_val_op(nested_start, end)
)
else:
self._once_time_op(nested_start, end)
# CRITICAL FIX: Call oncebinding to copy computed values to bound lines
# This is needed in runonce mode where once() computes all values at once
self.oncebinding()
def _once_op(self, start, end):
# Only call once() on LineActions instances (e.g., _LineDelay), not full Indicators
if isinstance(self.b, LineActions) and hasattr(self.b, "once") and len(self.b.array) < end:
try:
self.b.once(start, end)
except Exception as e:
logger.debug("b.once() in _once_op failed: %s", e)
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
op = self.operation
# Ensure destination array is sized for direct index assignment
while len(dst) < end:
dst.append(float("nan"))
# Clip processing range to available source data
# CRITICAL FIX: Check if b is a _LineDelay wrapping a constant (PseudoArray)
# In this case, srcb will be empty but b[i] will return the constant
is_constant_b = (
len(srcb) == 0 and hasattr(self.b, "a") and type(self.b.a).__name__ == "PseudoArray"
)
if is_constant_b:
# b is a _LineDelay wrapping a constant - use srca length only
end = min(end, len(srca))
else:
end = min(end, len(srca), len(srcb))
# Use dynamic access for constant values wrapped in _LineDelay
use_dynamic_b = is_constant_b
# CRITICAL FIX: Always process from 0 to ensure historical values are available
# This is needed for indicators like SMA that need historical values for their calculations
actual_start = 0
# Fast path under a single try; the per-element try/except below is only
# entered on error, preserving NaN-on-failure semantics while removing
# per-element exception-handler setup in the common case (R2-S4: PERF203).
try:
for i in range(actual_start, end):
a_val = srca[i]
b_val = self.b[i] if use_dynamic_b else srcb[i]
if a_val is None or a_val != a_val or b_val is None or b_val != b_val:
dst[i] = float("nan")
continue
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
if isinstance(b_val, float) and not math.isfinite(b_val):
b_val = 0.0
result = op(b_val, a_val) if self.r else op(a_val, b_val)
if result is None or result != result:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
result = 0.0
dst[i] = result
return
except Exception:
logger.debug(
"LinesOperation._once_op fast path failed; per-element fallback", exc_info=True
)
for i in range(actual_start, end):
try:
a_val = srca[i]
if use_dynamic_b:
b_val = self.b[i] # Use __getitem__ for constants
else:
b_val = srcb[i]
# Preserve NaN semantics for indicators: if any operand is None/NaN -> NaN
if a_val is None or a_val != a_val or b_val is None or b_val != b_val:
dst[i] = float("nan")
continue
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
if isinstance(b_val, float) and not math.isfinite(b_val):
b_val = 0.0
if self.r:
result = op(b_val, a_val)
else:
result = op(a_val, b_val)
# Preserve NaN semantics
if result is None or result != result:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
result = 0.0
dst[i] = result
except Exception:
logger.debug(
"LinesOperation._once_op fallback triggered at index %d", i, exc_info=True
)
# If operation fails, store NaN for indicator semantics
dst[i] = float("nan")
def _once_time_op(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b[0]
op = self.operation
# Ensure destination array is sized for direct index assignment
while len(dst) < end:
dst.append(float("nan"))
# Clip processing range to available source data
end = min(end, len(srca))
for i in range(start, end):
try:
a_val = srca[i]
# Preserve NaN semantics
if a_val is None or a_val != a_val or srcb is None or srcb != srcb:
dst[i] = float("nan")
continue
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
if isinstance(srcb, float) and not math.isfinite(srcb):
srcb = 0.0
if self.r:
result = op(srcb, a_val)
else:
result = op(a_val, srcb)
if result is None or result != result:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
result = 0.0
dst[i] = result
except Exception:
logger.debug(
"LinesOperation._once_time_op fallback triggered at index %d", i, exc_info=True
)
dst[i] = float("nan")
def _once_val_op(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b[0] if hasattr(self.b, "__getitem__") else self.b
op = self.operation
# Ensure destination array is sized for direct index assignment
while len(dst) < end:
dst.append(float("nan"))
# Clip processing range to available source data
end = min(end, len(srca))
for i in range(start, end):
try:
a_val = srca[i]
if a_val is None or a_val != a_val or srcb is None or srcb != srcb:
dst[i] = float("nan")
continue
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
if isinstance(srcb, float) and not math.isfinite(srcb):
srcb = 0.0
result = op(a_val, srcb)
if result is None or result != result:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
result = 0.0
dst[i] = result
except Exception:
logger.debug(
"LinesOperation._once_val_op fallback triggered at index %d", i, exc_info=True
)
dst[i] = float("nan")
def _once_val_op_r(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b[0] if hasattr(self.b, "__getitem__") else self.b
op = self.operation
# Ensure destination array is sized for direct index assignment
while len(dst) < end:
dst.append(float("nan"))
# Clip processing range to available source data
end = min(end, len(srca))
for i in range(start, end):
try:
a_val = srca[i]
if a_val is None or a_val != a_val or srcb is None or srcb != srcb:
dst[i] = float("nan")
continue
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
if isinstance(srcb, float) and not math.isfinite(srcb):
srcb = 0.0
result = op(srcb, a_val)
if result is None or result != result:
result = float("nan")
elif isinstance(result, float) and not math.isfinite(result):
result = 0.0
dst[i] = result
except Exception:
logger.debug(
"LinesOperation._once_val_op_r fallback triggered at index %d", i, exc_info=True
)
dst[i] = float("nan")
[docs]
class LineOwnOperation(LineActions):
"""Operation on a single line object (unary operations).
This class represents unary operations (negation, absolute value, etc.)
on a single line object. The result is a new line that contains the
element-wise operation result.
Attributes:
operation: The unary function to apply (e.g., operator.neg).
a: The operand (line object).
_parent_a: Parent indicator for the operand.
Example:
>>> result = LineOwnOperation(indicator, operator.neg)
>>> # result[0] = -indicator[0]
"""
def __init__(self, a, operation, parent_a=None):
"""Initialize a unary operation on a line object.
Args:
a: The operand (line object).
operation: The unary function to apply (e.g., operator.neg).
parent_a: Parent indicator for the operand.
"""
super().__init__()
self.operation = operation
self.a = a
# CRITICAL FIX: Store reference to parent indicator for _once processing
self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a)
a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1
self.updateminperiod(a_minperiod)
def _find_parent_indicator(self, operand):
"""Find the parent indicator that owns this operand.
Only returns LineActions objects. Full Indicators are never returned to
prevent premature once_via_next() calls (see LinesOperation._find_parent_indicator).
"""
if isinstance(operand, LineActions):
return operand
if hasattr(operand, "_owner") and operand._owner is not None:
owner = operand._owner
if hasattr(owner, "_owner_ref") and owner._owner_ref is not None:
ref = owner._owner_ref
if isinstance(ref, LineActions):
return ref
if isinstance(owner, LineActions):
return owner
return None
[docs]
def __getitem__(self, ago):
"""CRITICAL FIX: Override __getitem__ to compute value dynamically from source operand."""
try:
a_val = self.a[ago] if hasattr(self.a, "__getitem__") else self.a
if a_val is None or (isinstance(a_val, float) and a_val != a_val):
return float("nan")
if isinstance(a_val, float) and not math.isfinite(a_val):
a_val = 0.0
result = self.operation(a_val)
if isinstance(result, float) and not math.isfinite(result):
return 0.0
return result
except (IndexError, TypeError):
return float("nan")
[docs]
def next(self):
"""Calculate and set the unary operation result for the current bar.
Performs the unary operation on the current value of the operand
and stores the result at position 0.
"""
a_val = self.a[0]
if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)):
a_val = 0.0
result = self.operation(a_val)
if result is None or (isinstance(result, float) and not math.isfinite(result)):
result = 0.0
self[0] = result
[docs]
def once(self, start, end):
"""Calculate unary operation results in batch mode (runonce).
Args:
start: Starting index.
end: Ending index.
"""
# CRITICAL FIX: Ensure source operand is processed first
if self._parent_a is not None and hasattr(self._parent_a, "_once"):
try:
self._parent_a._once(start, end)
except Exception as e:
logger.debug("parent_a._once() in LineOwnOperation failed: %s", e)
# cache python dictionary lookups
dst = self.array
srca = self.a.array
op = self.operation
# CRITICAL FIX: Ensure destination array is properly sized
while len(dst) < end:
dst.append(float("nan"))
# CRITICAL FIX: Ensure source array has required data
if len(srca) < end:
# If source array is shorter than required range, only process available data
end = min(end, len(srca))
# Fast path under a single try; per-element fallback only on error
# (preserves 0.0-on-failure semantics, removes per-element handler setup; R2-S4).
try:
for i in range(start, end):
a_val = srca[i] if i < len(srca) else 0.0
if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)):
a_val = 0.0
result = op(a_val)
if result is None or (isinstance(result, float) and not math.isfinite(result)):
result = 0.0
dst[i] = result
return
except Exception:
logger.debug("unary once fast path failed; per-element fallback", exc_info=True)
for i in range(start, end):
try:
# CRITICAL FIX: Bounds checking for source array
a_val = srca[i] if i < len(srca) else 0.0
# Ensure value is numeric
if a_val is None or (isinstance(a_val, float) and not math.isfinite(a_val)):
a_val = 0.0
result = op(a_val)
# Ensure result is valid
if result is None or (isinstance(result, float) and not math.isfinite(result)):
result = 0.0
dst[i] = result
except Exception:
# If operation fails, store 0.0
dst[i] = 0.0
[docs]
def size(self):
"""Return the number of lines in this LineActions object"""
if hasattr(self, "lines") and hasattr(self.lines, "size"):
return self.lines.size()
if hasattr(self, "lines") and hasattr(self.lines, "__len__"):
return len(self.lines)
return 1 # Default to 1 line if no lines object available