#!/usr/bin/env python
"""Functions Module - Common operations on line objects.
This module provides utility functions and classes for performing
operations on line objects. It includes arithmetic operations with
zero-division protection, logical operations, comparison operations,
and mathematical functions.
Classes:
Logic: Base class for logical operations on lines.
DivByZero: Division with zero-division protection.
DivZeroByZero: Division with zero/zero indetermination protection.
And/Or/Not/If/Max/Min/MinN/MaxN: Logical and comparison operations.
Sum/Average/StdDev/TSMean: Statistical operations.
Example:
Using indicator functions:
>>> from backtrader.functions import And, Or
>>> condition = And(indicator1 > indicator2, indicator3 > 0)
"""
import functools
import math
from .linebuffer import LineActions
from .utils.log_message import get_logger
from .utils.py3 import cmp, range
logger = get_logger(__name__)
def _sanitize_cmp_value(value):
if value is None:
return 0.0
if isinstance(value, float) and not math.isfinite(value):
return 0.0
return value
def _sanitize_div_value(value):
if value is None:
return 0.0
if isinstance(value, float) and not math.isfinite(value):
return 0.0
return value
def _sanitize_numeric_values(values):
return [_sanitize_div_value(value) for value in values]
def _value_at(array, index, default=0.0):
try:
return array[index]
except (IndexError, TypeError):
try:
return array[-1]
except (IndexError, TypeError):
return default
def _maxlogic(values):
return max(_sanitize_numeric_values(values))
def _minlogic(values):
return min(_sanitize_numeric_values(values))
def _sumlogic(values):
return math.fsum(_sanitize_numeric_values(values))
# Generate a List equivalent which uses "is" for contains
# Create a new List class, overriding __contains__ method, if any element in list has hash value equal to other's hash value, return True
[docs]
class List(list):
"""List subclass that uses hash equality for contains checks.
This class overrides __contains__ to check if any element has
the same hash value as the target, rather than using identity comparison.
"""
def __contains__(self, other):
return any(x is other for x in self)
# Create a class to serialize elements within it
[docs]
class Logic(LineActions):
"""Base class for logical operations on line objects.
Handles argument conversion to arrays and manages minperiod
propagation from operands.
"""
def __init__(self, *args):
"""Initialize the Logic operation.
Converts all arguments to arrays and propagates minperiod
from operands to ensure proper synchronization.
Args:
*args: Line objects or values to operate on.
"""
super().__init__()
self.args = [self.arrayize(arg) for arg in args]
# CRITICAL FIX: Collect minperiods from args and update own minperiod
# This ensures functions like And, Or, etc. inherit the max minperiod from their operands
_minperiods = []
for arg in self.args:
mp = getattr(arg, "_minperiod", 1)
_minperiods.append(mp)
if _minperiods:
max_minperiod = max(_minperiods)
self.updateminperiod(max_minperiod)
def _next(self):
clock = getattr(self, "_clock", None)
if clock is not None and clock.__class__.__name__ != "MinimalClock":
try:
if len(clock) <= len(self):
return
except Exception: # nosec B110
# Clock without a comparable length; proceed to advance below.
pass
target_len = len(self) + 1
for arg in getattr(self, "args", ()):
if isinstance(arg, LineActions) and hasattr(arg, "_next"):
try:
if len(arg) < target_len:
arg._next()
except Exception as e:
logger.debug("Logic operand _next() failed: %s", e)
self.advance()
self.next()
for binding in self.bindings:
binding[0] = self[0]
# Avoid division by zero when dividing two lines, if denominator is 0, division result is 0
[docs]
class DivByZero(Logic):
"""This operation is a Lines object and fills it values by executing a
division on the numerator / denominator arguments and avoiding a division
by zero exception by checking the denominator
Params:
- a: numerator (numeric or iterable object ... mostly a Lines object)
- b: denominator (numeric or iterable object ... mostly a Lines object)
- zero (def: 0.0): value to apply if division by zero is raised
"""
def __init__(self, a, b, zero=0.0):
"""Initialize the DivByZero operation.
Args:
a: Numerator line or value.
b: Denominator line or value.
zero: Value to return when division by zero occurs.
"""
super().__init__(a, b)
self.a = self.args[0]
self.b = self.args[1]
self.zero = zero
[docs]
def next(self):
"""Calculate the next value with zero-division protection."""
a = _sanitize_div_value(self.a[0])
b = _sanitize_div_value(self.b[0])
self[0] = a / b if b else self.zero
[docs]
def once(self, start, end):
"""Calculate all values at once with zero-division protection.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
zero = self.zero
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
for i in range(start, end):
a = _sanitize_div_value(_value_at(srca, i))
b = _sanitize_div_value(_value_at(srcb, i))
dst[i] = a / b if b else zero
# Division operation for two lines considering both numerator and denominator may be 0
[docs]
class DivZeroByZero(Logic):
"""This operation is a Lines object and fills it values by executing a
division on the numerator / denominator arguments and avoiding a division
by zero exception or an indetermination by checking the
denominator/numerator pair
Params:
- a: numerator (numeric or iterable object ... mostly a Lines object)
- b: denominator (numeric or iterable object ... mostly a Lines object)
- single (def: +inf): value to apply if division is x / 0
- dual (def: 0.0): value to apply if division is 0 / 0
"""
def __init__(self, a, b, single=float("inf"), dual=0.0):
"""Initialize the DivZeroByZero operation.
Args:
a: Numerator line or value.
b: Denominator line or value.
single: Value to return when numerator is non-zero and denominator is zero.
dual: Value to return when both numerator and denominator are zero.
"""
super().__init__(a, b)
self.a = self.args[0]
self.b = self.args[1]
self.single = single
self.dual = dual
[docs]
def next(self):
"""Calculate the next value with zero/zero indetermination protection."""
b = _sanitize_div_value(self.b[0])
a = _sanitize_div_value(self.a[0])
if b == 0.0:
self[0] = self.dual if a == 0.0 else self.single
else:
self[0] = a / b
[docs]
def once(self, start, end):
"""Calculate all values at once with zero/zero indetermination protection.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
single = self.single
dual = self.dual
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
for i in range(start, end):
b = _sanitize_div_value(_value_at(srcb, i))
a = _sanitize_div_value(_value_at(srca, i))
if b == 0.0:
dst[i] = dual if a == 0.0 else single
else:
dst[i] = a / b
# Compare a and b, a and b are likely lines
[docs]
class Cmp(Logic):
"""Comparison operation that returns comparison results.
Compares two line objects and returns standard comparison values:
-1 if a < b, 0 if a == b, 1 if a > b.
"""
def __init__(self, a, b):
"""Initialize the comparison operation.
Args:
a: First line or value to compare.
b: Second line or value to compare.
"""
super().__init__(a, b)
self.a = self.args[0]
self.b = self.args[1]
[docs]
def next(self):
"""Calculate the next comparison value."""
self[0] = cmp(_sanitize_cmp_value(self.a[0]), _sanitize_cmp_value(self.b[0]))
[docs]
def once(self, start, end):
"""Calculate all comparison values at once.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
for i in range(start, end):
dst[i] = cmp(
_sanitize_cmp_value(_value_at(srca, i)),
_sanitize_cmp_value(_value_at(srcb, i)),
)
# Compare two lines, a and b, return corresponding r1 value when a<b, return r2 value when a=b, return r3 value when a>b
[docs]
class CmpEx(Logic):
"""Extended comparison operation with three possible return values.
Compares two line objects and returns one of three values based on
the comparison result:
- r1 if a < b
- r2 if a == b
- r3 if a > b
"""
def __init__(self, a, b, r1, r2, r3):
"""Initialize the extended comparison operation.
Args:
a: First line or value to compare.
b: Second line or value to compare.
r1: Value to return when a < b.
r2: Value to return when a == b.
r3: Value to return when a > b.
"""
super().__init__(a, b, r1, r2, r3)
self.a = self.args[0]
self.b = self.args[1]
self.r1 = self.args[2]
self.r2 = self.args[3]
self.r3 = self.args[4]
[docs]
def next(self):
"""Calculate the next extended comparison value."""
# self[0] = cmp(self.a[0], self.b[0])
a0 = _sanitize_cmp_value(self.a[0])
b0 = _sanitize_cmp_value(self.b[0])
if a0 < b0:
self[0] = _sanitize_div_value(self.r1[0])
elif a0 > b0:
self[0] = _sanitize_div_value(self.r3[0])
else:
self[0] = _sanitize_div_value(self.r2[0])
[docs]
def once(self, start, end):
"""Calculate all extended comparison values at once.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
r1 = self.r1.array
r2 = self.r2.array
r3 = self.r3.array
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
for i in range(start, end):
ai = _sanitize_cmp_value(_value_at(srca, i))
bi = _sanitize_cmp_value(_value_at(srcb, i))
if ai < bi:
dst[i] = _sanitize_div_value(_value_at(r1, i))
elif ai > bi:
dst[i] = _sanitize_div_value(_value_at(r3, i))
else:
dst[i] = _sanitize_div_value(_value_at(r2, i))
# If statement, return corresponding a value when cond is satisfied, return b value when not satisfied
[docs]
class If(Logic):
"""Conditional selection operation.
Returns a value from a or b based on a condition:
- Returns a if condition is True
- Returns b if condition is False
"""
def __init__(self, cond, a, b):
"""Initialize the conditional operation.
Args:
cond: Condition line - must evaluate to boolean.
a: Value to return when condition is True.
b: Value to return when condition is False.
"""
super().__init__(cond, a, b)
self.cond = self.args[0]
self.a = self.args[1]
self.b = self.args[2]
[docs]
def next(self):
"""Calculate the next conditional value."""
cond_val = _sanitize_div_value(self.cond[0])
value = self.a[0] if cond_val else self.b[0]
self[0] = _sanitize_div_value(value)
def _has_self_reference(self):
"""Check if this If operation has a self-referencing pattern.
Detects patterns like: self.lines.direction = bt.If(..., direction(-1))
where the output line appears as an input via _LineDelay.
"""
if not self.bindings:
return False
# Get the bound line(s)
bound_lines = {id(b) for b in self.bindings}
# Check if any operand references a bound line (via _LineDelay)
def _check_ref(obj, depth=0):
if depth > 10:
return False
if hasattr(obj, "a"):
# _LineDelay: check if obj.a is one of our bound lines
if id(getattr(obj, "a", None)) in bound_lines:
return True
# Check if obj.a's array is the same as a bound line's array
obj_a = getattr(obj, "a", None)
if obj_a is not None:
for binding in self.bindings:
if hasattr(obj_a, "array") and hasattr(binding, "array"):
if obj_a.array is binding.array:
return True
if _check_ref(obj_a, depth + 1):
return True
if hasattr(obj, "b"):
obj_b = getattr(obj, "b", None)
if obj_b is not None:
if id(obj_b) in bound_lines:
return True
if hasattr(obj_b, "array"):
for binding in self.bindings:
if hasattr(binding, "array") and obj_b.array is binding.array:
return True
if _check_ref(obj_b, depth + 1):
return True
if hasattr(obj, "args"):
for arg in getattr(obj, "args", []):
if _check_ref(arg, depth + 1):
return True
if hasattr(obj, "cond"):
if _check_ref(getattr(obj, "cond", None), depth + 1):
return True
return False
return _check_ref(self.a) or _check_ref(self.b) or _check_ref(self.cond)
[docs]
def once(self, start, end):
"""Calculate all conditional values at once.
Supports self-referencing patterns like:
self.lines.direction = bt.If(cond, 1, bt.If(cond2, -1, self.lines.direction(-1)))
For self-referencing patterns, processes bar-by-bar using next() semantics
to ensure previously computed values are available for the next bar.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
dst = self.array
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
# Also ensure bound line arrays are sized
for binding in self.bindings:
while len(binding.array) < end:
binding.array.append(0.0)
# For self-referencing patterns, use bar-by-bar processing
# This ensures _LineDelay can read previously computed values
if self.bindings and self._has_self_reference():
self._once_sequential(start, end)
return
# Standard If expressions can contain nested LinesOperation/_LineDelay
# operands which are not always scheduled separately by LineIterator.
# Compute them explicitly before reading their arrays below.
for operand in (self.cond, self.a, self.b):
if hasattr(operand, "once") and len(getattr(operand, "array", [])) < end:
try:
operand.once(0, end)
except Exception as e:
logger.debug("If operand once() failed: %s", e)
# Standard batch processing for non-self-referencing patterns
self._once_batch(start, end)
def _once_sequential(self, start, end):
"""Process bar-by-bar for self-referencing patterns.
Ensures all operand arrays are computed first, then processes
sequentially with immediate binding propagation so _LineDelay
can read previously written values.
"""
dst = self.array
has_bindings = bool(self.bindings)
# Ensure operand arrays are computed first
# The condition (LinesOperation) needs its once() called
if hasattr(self.cond, "once") and len(getattr(self.cond, "array", [])) < end:
try:
self.cond.once(start, end)
except Exception: # nosec B110
# Operand already (partially) computed or not once()-able; continue.
pass
# The 'a' operand (could be constant or LinesOperation)
if hasattr(self.a, "once") and len(getattr(self.a, "array", [])) < end:
try:
self.a.once(start, end)
except Exception: # nosec B110
# Operand already (partially) computed or not once()-able; continue.
pass
# The 'b' operand - for self-referencing, this is typically another bt.If
# We need to compute it BUT it contains the self-reference, so we handle it specially
if hasattr(self.b, "once") and len(getattr(self.b, "array", [])) < end:
# Check if b itself has self-reference (nested bt.If with direction(-1))
# If so, we need to compute b bar-by-bar too
if hasattr(self.b, "_has_self_reference") and self.b._has_self_reference():
# Don't call b.once() - we'll compute b[i] dynamically
pass
else:
try:
self.b.once(start, end)
except Exception: # nosec B110
# Operand already (partially) computed or not once()-able; continue.
pass
# Get arrays for direct access where possible
cond_array = getattr(self.cond, "array", [])
cond_has_array = len(cond_array) >= end
a_array = getattr(self.a, "array", [])
a_has_array = len(a_array) >= end
# Check if a is a constant (_LineDelay wrapping PseudoArray)
a_is_constant = False
a_constant_val = None
if not a_has_array:
try:
a_constant_val = self.a[0]
a_is_constant = True
except Exception: # nosec B110
# 'a' is neither array-backed nor a constant scalar; leave defaults.
pass
b_array = getattr(self.b, "array", [])
b_has_array = len(b_array) >= end
for i in range(start, end):
# Get condition value
if cond_has_array:
cond_val = cond_array[i]
else:
try:
cond_val = (
self.cond.array[i] if i < len(getattr(self.cond, "array", [])) else 0.0
)
except (IndexError, TypeError):
cond_val = 0.0
cond_bool = (cond_val != 0.0) and (
not (isinstance(cond_val, float) and math.isnan(cond_val))
)
if cond_bool:
# Get a value
if a_is_constant:
val = a_constant_val
elif a_has_array:
val = a_array[i]
else:
try:
val = self.a.array[i] if i < len(getattr(self.a, "array", [])) else 0.0
except (IndexError, TypeError):
val = 0.0
else:
# Get b value - for self-referencing, b is the inner bt.If
# which reads from _LineDelay(direction, -1)
if b_has_array:
val = b_array[i]
else:
# b's array isn't fully computed - compute dynamically
# For nested bt.If with self-reference, we need to evaluate it
try:
val = self._eval_operand_at(self.b, i)
except Exception:
val = 0.0
val = _sanitize_div_value(val)
dst[i] = val
# Propagate to bindings immediately for self-referencing
if has_bindings:
for binding in self.bindings:
binding.array[i] = val
def _eval_operand_at(self, operand, i):
"""Evaluate an operand at absolute index i.
For nested bt.If with self-reference, recursively evaluates
the condition and branches at the given index.
"""
if isinstance(operand, If):
# Recursively evaluate the nested If
# Get condition
cond_arr = getattr(operand.cond, "array", [])
if i < len(cond_arr):
cond_val = cond_arr[i]
else:
cond_val = 0.0
cond_bool = (cond_val != 0.0) and (
not (isinstance(cond_val, float) and math.isnan(cond_val))
)
if cond_bool:
return self._eval_operand_at(operand.a, i)
return self._eval_operand_at(operand.b, i)
# For _LineDelay, read from its source array at offset
if hasattr(operand, "ago") and hasattr(operand, "a"):
src_array = getattr(operand.a, "array", [])
src_idx = i + operand.ago
if 0 <= src_idx < len(src_array):
return src_array[src_idx]
return 0.0
# For arrays, direct access
arr = getattr(operand, "array", [])
if i < len(arr):
return arr[i]
# Constant
try:
return operand[0]
except Exception:
return 0.0
def _once_batch(self, start, end):
"""Standard batch processing for non-self-referencing patterns."""
dst = self.array
# Detect constants
a_is_constant = False
a_constant_val = None
try:
srca = self.a.array
a_has_array = len(srca) > 0
if not a_has_array:
try:
a_constant_val = self.a[0]
a_is_constant = True
except Exception: # nosec B110
# 'a' has an empty array and no scalar value; not a constant.
pass
except (AttributeError, TypeError):
srca = []
a_has_array = False
try:
a_constant_val = self.a[0]
a_is_constant = True
except Exception: # nosec B110
# 'a' is neither array-backed nor a scalar constant.
pass
b_is_constant = False
b_constant_val = None
try:
srcb = self.b.array
b_has_array = len(srcb) > 0
if not b_has_array:
try:
b_constant_val = self.b[0]
b_is_constant = True
except Exception: # nosec B110
# 'b' has an empty array and no scalar value; not a constant.
pass
except (AttributeError, TypeError):
srcb = []
b_has_array = False
try:
b_constant_val = self.b[0]
b_is_constant = True
except Exception: # nosec B110
# 'b' is neither array-backed nor a scalar constant.
pass
try:
cond = self.cond.array
cond_has_array = len(cond) > 0
except (AttributeError, TypeError):
cond = []
cond_has_array = False
a_use_dynamic = not a_is_constant and not a_has_array and hasattr(self.a, "__getitem__")
b_use_dynamic = not b_is_constant and not b_has_array and hasattr(self.b, "__getitem__")
has_bindings = bool(self.bindings)
for i in range(start, end):
if cond_has_array:
try:
cond_val = cond[i] if i < len(cond) else (cond[-1] if cond else 0.0)
except (IndexError, TypeError):
cond_val = 0.0
else:
try:
cond_val = self.cond[i] if hasattr(self.cond, "__getitem__") else 0.0
except Exception:
cond_val = 0.0
cond_bool = (cond_val != 0.0) and (
not (isinstance(cond_val, float) and math.isnan(cond_val))
)
if a_is_constant:
a_val = a_constant_val
elif a_has_array:
try:
a_val = srca[i] if i < len(srca) else (srca[-1] if srca else 0.0)
except (IndexError, TypeError):
a_val = 0.0
elif a_use_dynamic:
try:
a_val = self.a[i]
except Exception:
a_val = 0.0
else:
a_val = 0.0
if b_is_constant:
b_val = b_constant_val
elif b_has_array:
try:
b_val = srcb[i] if i < len(srcb) else (srcb[-1] if srcb else 0.0)
except (IndexError, TypeError):
b_val = 0.0
elif b_use_dynamic:
try:
b_val = self.b[i]
except Exception:
b_val = 0.0
else:
b_val = 0.0
a_val = _sanitize_div_value(a_val)
b_val = _sanitize_div_value(b_val)
val = a_val if cond_bool else b_val
dst[i] = val
# Propagate to bindings for consistency
if has_bindings:
for binding in self.bindings:
binding.array[i] = val
# Apply one logic to multiple elements
[docs]
class MultiLogic(Logic):
"""Base class for operations that apply a function to multiple arguments.
The flogic attribute should be set to a callable that takes
an iterable of values and returns a single result.
"""
[docs]
def next(self):
"""Apply the logic function to current values from all arguments."""
self[0] = self.flogic([arg[0] for arg in self.args])
[docs]
def once(self, start, end):
"""Apply the logic function to all values across the specified range.
Args:
start: Starting index for calculation.
end: Ending index for calculation.
"""
# cache python dictionary lookups
dst = self.array
# Ensure destination array is properly sized
while len(dst) < end:
dst.append(0.0)
for arg in self.args:
if isinstance(arg, LineActions) and hasattr(arg, "once"):
try:
if len(getattr(arg, "array", [])) < end:
arg.once(0, end)
except Exception as e:
logger.debug("MultiLogic operand once() failed: %s", e)
arrays = [arg.array for arg in self.args]
flogic = self.flogic
for i in range(start, end):
dst[i] = flogic([_value_at(arr, i) for arr in arrays])
# Mainly uses functools.partial to generate partial function, functools.reduce, iterates function on a sequence
[docs]
class MultiLogicReduce(MultiLogic):
"""MultiLogic that uses functools.reduce for cumulative operations.
This class applies a reduction function cumulatively to all arguments,
combining them into a single result.
"""
def __init__(self, *args, **kwargs):
"""Initialize the reduction operation.
Args:
*args: Line objects or values to reduce.
**kwargs: Optional keyword arguments including 'initializer'.
"""
super().__init__(*args)
if "initializer" not in kwargs:
self.flogic = functools.partial(functools.reduce, self.flogic)
else:
self.flogic = functools.partial(
functools.reduce, self.flogic, initializer=kwargs["initializer"]
)
# Inheritance class, process flogic
[docs]
class Reduce(MultiLogicReduce):
"""Generic reduction operation with a custom function.
Allows any reduction function to be applied to the arguments.
"""
def __init__(self, flogic, *args, **kwargs):
"""Initialize the custom reduction operation.
Args:
flogic: Function to use for reduction.
*args: Line objects or values to reduce.
**kwargs: Optional keyword arguments.
"""
self.flogic = flogic
super().__init__(*args, **kwargs)
# The _xxxlogic functions are defined at module scope to make them
# pickable and therefore compatible with multiprocessing
# Determine if both x and y are True
def _andlogic(x, y):
"""Logical AND operation for reduction."""
return bool(x and y)
# Determine if all elements are True
[docs]
class And(MultiLogicReduce):
"""Logical AND operation across all arguments.
Returns True only if all input values are truthy.
"""
flogic = staticmethod(_andlogic)
# Determine if either x or y is true
def _orlogic(x, y):
"""Logical OR operation for reduction."""
return bool(x or y)
# Determine if any element in the sequence is true
[docs]
class Or(MultiLogicReduce):
"""Logical OR operation across all arguments.
Returns True if any input value is truthy.
"""
flogic = staticmethod(_orlogic)
# Find maximum value
[docs]
class Max(MultiLogic):
"""Maximum operation across all arguments.
Returns the maximum value from all input lines.
"""
flogic = staticmethod(_maxlogic)
# Find minimum value
[docs]
class Min(MultiLogic):
"""Minimum operation across all arguments.
Returns the minimum value from all input lines.
"""
flogic = staticmethod(_minlogic)
# Calculate sum
[docs]
class Sum(MultiLogic):
"""Sum operation across all arguments.
Returns the sum of all input values using math.fsum
for better floating point precision.
"""
flogic = staticmethod(_sumlogic)
# Check if any exists
[docs]
class Any(MultiLogic):
"""Any operation across all arguments.
Returns True if any input value is truthy.
"""
flogic = any
# Check if all
[docs]
class All(MultiLogic):
"""All operation across all arguments.
Returns True only if all input values are truthy.
"""
flogic = all