Source code for backtrader.feed

#!/usr/bin/env python
"""Data Feed Module - Financial data feed implementations.

This module provides the base classes and implementations for data feeds
in backtrader. Data feeds are the source of price/volume data for strategies
and indicators.

Key Classes:
    AbstractDataBase: Base class for all data feeds with core functionality.
    DataBase: Full-featured data feed with replay/resample support.
    CSVDataBase: Base class for CSV file data feeds.
    FeedBase: Base for live/real-time data feeds.

Data feeds provide:
    - OHLCV (Open, High, Low, Close, Volume) data
    - Timeline management and session handling
    - Replay and resampling capabilities
    - Live data support for trading

Example:
    Creating a custom data feed:
    >>> class MyDataFeed(CSVDataBase):
    ...     params = (('dataname', 'data.csv'),)
"""

import collections
import datetime
import inspect
import os.path

from . import dataseries, metabase
from .dataseries import SimpleFilterWrapper, TimeFrame
from .resamplerfilter import Replayer, Resampler
from .tradingcal import PandasMarketCalendar
from .utils import date2num, num2date, time2num, tzparse
from .utils.date import Localizer
from .utils.log_message import get_logger
from .utils.py3 import range, string_types, zip

logger = get_logger(__name__)


# Refactor: Remove metaclass, use normal class and initialization method
[docs] class AbstractDataBase(dataseries.OHLCDateTime): """Base class for all data feed implementations. Provides the core functionality for data feeds including: - Data loading and preprocessing - Timeline management - Session handling - Live data support - Notification system for data status changes States: CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED, LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN Params: dataname: Data source identifier (filename, URL, etc.). name: Display name for the data feed. compression: Timeframe compression factor. timeframe: TimeFrame period (Days, Minutes, etc.). fromdate: Start date for data filtering. todate: End date for data filtering. sessionstart: Session start time. sessionend: Session end time. filters: List of data filters to apply. tz: Output timezone. tzinput: Input timezone. qcheck: Timeout in seconds for live event checking. calendar: Trading calendar to use. Example: >>> data = AbstractDataBase(dataname='data.csv') >>> cerebro.adddata(data) """ # Class-level registry dictionary, replacing metaclass _indcol functionality _registry: dict = {} # Parameter initialization settings - use _params_tuple to save original definition _params_tuple: tuple = ( ("dataname", None), ("name", ""), ("compression", 1), ("timeframe", TimeFrame.Days), ("fromdate", None), ("todate", None), ("sessionstart", None), ("sessionend", None), ("filters", []), ("tz", None), ("tzinput", None), ("qcheck", 0.0), # timeout in seconds (float) to check for events ("calendar", None), ) # Keep original params definition for compatibility with metaclass system params = _params_tuple # Eight different states of data ( CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED, LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN, ) = range(8) # Notification names _NOTIFNAMES = [ "CONNECTED", "DISCONNECTED", "CONNBROKEN", "DELAYED", "LIVE", "NOTSUBSCRIBED", "NOTSUPPORTED_TIMEFRAME", "UNKNOWN", ] def __init__(self, *args, **kwargs): """Initialize the data feed. Args: *args: Positional arguments. **kwargs: Keyword arguments for data feed parameters. """ # Execute the original metaclass dopreinit functionality self._init_preinit(*args, **kwargs) # Call parent class initialization super().__init__(*args, **kwargs) # Execute the original metaclass dopostinit functionality self._init_postinit(*args, **kwargs) # CRITICAL FIX: Mark all lines as belonging to a data feed # This must be done AFTER _init_postinit to ensure lines are fully initialized # This allows LineSeries.__getitem__ and LineBuffer.__getitem__ to correctly # raise IndexError when accessing out-of-range indices # This is essential for expire_order_close() to detect insufficient data if hasattr(self, "lines") and self.lines is not None: try: for line in self.lines: if hasattr(line, "__dict__"): line._is_data_feed_line = True except Exception as e: logger.debug("Failed to mark data feed lines: %s", e) # CRITICAL FIX: Also explicitly mark the datetime line # The datetime line might be accessed separately (e.g., self.datas[0].datetime) # and needs to raise IndexError when accessing out of bounds if hasattr(self, "datetime") and self.datetime is not None: try: if hasattr(self.datetime, "__dict__"): self.datetime._is_data_feed_line = True except Exception as e: logger.debug("Failed to mark datetime line: %s", e) # Original content from __init__ self._env = None self._barstash: collections.deque = collections.deque() self._barstack: collections.deque = collections.deque() self._laststatus = None def _init_preinit(self, *args, **kwargs): """Replace the original MetaAbstractDataBase.dopreinit""" # Find the owner and store it self._feed = self._find_feed_owner() # Initialize a queue to store notifications from cerebro self.notifs: collections.deque = collections.deque() # store notifications for cerebro # Get _dataname value from parameters self._dataname = getattr(self.p, "dataname", None) # Default _name attribute is empty self._name = "" def _init_postinit(self, *args, **kwargs): """Replace the original MetaAbstractDataBase.dopostinit""" # Either set by subclass or the parameter or use the dataname (ticker) # Reset _name attribute, if _name is not empty, keep it; if empty, set it to name parameter value self._name = self._name or getattr(self.p, "name", "") # If _name attribute value is still empty and dataname parameter value is string, set _name to dataname value if not self._name and isinstance(getattr(self.p, "dataname", None), string_types): self._name = self.p.dataname # _compression value equals the compression parameter value self._compression = getattr(self.p, "compression", 1) # _timeframe value equals the timeframe parameter value self._timeframe = getattr(self.p, "timeframe", TimeFrame.Days) # Only set sessionstart/sessionend defaults if they weren't explicitly passed # If start time is datetime format, equals specific time from sessionstart; if None, equals minimum time sessionstart = getattr(self.p, "sessionstart", None) if isinstance(sessionstart, datetime.datetime): self.p.sessionstart = sessionstart.time() elif sessionstart is None: # CRITICAL FIX: Always set default if None (kwargs check was unreliable) self.p.sessionstart = datetime.time.min # If end time is datetime format, equals specific time from sessionend; if None, equals 23:59:59.999990 sessionend = getattr(self.p, "sessionend", None) if isinstance(sessionend, datetime.datetime): self.p.sessionend = sessionend.time() elif sessionend is None: # CRITICAL FIX: Always set default if None (kwargs check was unreliable) # remove 9 to avoid precision rounding errors self.p.sessionend = datetime.time(23, 59, 59, 999990) # If start date is date format and has no hour attribute, add sessionstart time to convert start date to date+time format fromdate = getattr(self.p, "fromdate", None) if isinstance(fromdate, datetime.date): # push it to the end of the day, or else intraday # values before the end of the day would be gone if not hasattr(fromdate, "hour"): self.p.fromdate = datetime.datetime.combine(fromdate, self.p.sessionstart) # If end date is date format and has no hour attribute, add sessionend time to convert start date to date+time format todate = getattr(self.p, "todate", None) if isinstance(todate, datetime.date): # push it to the end of the day, or else intraday # values before the end of the day would be gone if not hasattr(todate, "hour"): self.p.todate = datetime.datetime.combine(todate, self.p.sessionend) # Set _barstack and _barstash as queues for filter operations self._barstack = collections.deque() # for filter operations self._barstash = collections.deque() # for filter operations # Set _filters and _ffilters as empty lists self._filters: list = [] self._ffilters: list = [] # Iterate through filters in parameters, first check if it's a class; if class, instantiate first; if instance has last attribute, add filter to _ffilters # If not a class, directly add filter to _filters filters = getattr(self.p, "filters", []) for fp in filters: if inspect.isclass(fp): fp = fp(self) if hasattr(fp, "last"): self._ffilters.append((fp, [], {})) self._filters.append((fp, [], {})) def _find_feed_owner(self): """Find the feed owner using metabase.findowner. This method delegates to metabase.findowner which uses OwnerContext for explicit owner management. """ # Use findowner which checks OwnerContext for owner lookup return metabase.findowner(self, FeedBase) @classmethod def _getstatusname(cls, status): return cls._NOTIFNAMES[status] # Initialize the following variables, may be used in live trading _compensate = None _feed = None _store = None _clone = False _qcheck = 0.0 # Time offset _tmoffset = datetime.timedelta() # Set to non 0 if resampling/replaying # Whether resampling or replaying, if not, set to 0 resampling = 0 replaying = 0 # Whether started _started = False def _start_finish(self): # A live feed (for example) may have learnt something about the # timezones after the start, and that's why the date/time related # parameters are converted at this late stage # Get the output timezone (if any) # Get specific timezone self._tz = self._gettz() # Lines have already been created, set the tz # Set specific timezone for time self.lines.datetime._settz(self._tz) # This should probably be also called from an override-able method # Localize input timezone self._tzinput = Localizer(self._gettzinput()) # Convert user input times to the output timezone (or min/max) # Convert user input start and end times to specific numbers; if None, start time is negative infinity, end time is positive infinity # If specific time, use date2num to convert to specific number if self.p.fromdate is None: self.fromdate = float("-inf") else: self.fromdate = self.date2num(self.p.fromdate) if self.p.todate is None: self.todate = float("inf") else: self.todate = self.date2num(self.p.todate) # Used by resamplerfilter and DataClone self.sessionstart = time2num(self.p.sessionstart) self.sessionend = time2num(self.p.sessionend) # Get calendar from parameters; if calendar is None, look for _tradingcal in local environment; if string, use PandasMarketCalendar self._calendar = cal = self.p.calendar if cal is None: self._calendar = self._env._tradingcal if self._env else None elif isinstance(cal, string_types): self._calendar = PandasMarketCalendar(calendar=cal) # Start state self._started = True def _start(self): self.start() # If not in start state yet, initialize first, then enter start state if not self._started: self._start_finish() def _timeoffset(self): # Time offset return self._tmoffset # Return next trading day end time in datetime format and numeric format def _getnexteos(self): """Returns the next eos using a trading calendar if available""" if self._clone: return self.data._getnexteos() if not len(self): return datetime.datetime.min, 0.0 dt = self.lines.datetime[0] dtime = num2date(dt) if self._calendar is None: nexteos = datetime.datetime.combine(dtime, self.p.sessionend) nextdteos = self.date2num(nexteos) # locl'ed -> utc-like nexteos = num2date(nextdteos) # utc while dtime > nexteos: nexteos += datetime.timedelta(days=1) # already utc-like nextdteos = date2num(nexteos) # -> utc-like else: # returns times in utc _, nexteos = self._calendar.schedule(dtime, self._tz) nextdteos = date2num(nexteos) # nextos is already utc return nexteos, nextdteos # Parse tzinput and return def _gettzinput(self): """Can be overriden by classes to return a timezone for input""" return tzparse(self.p.tzinput) # Parse tz and return def _gettz(self): """To be overriden by subclasses which may auto-calculate the timezone""" return tzparse(self.p.tz) # Convert time to number; if timezone info is not None, localize time first, then convert
[docs] def date2num(self, dt): """Convert datetime to internal numeric format. Args: dt: datetime object to convert. Returns: float: Internal numeric representation of the datetime. """ if self._tz is not None: return date2num(self._tz.localize(dt)) return date2num(dt)
# Convert number to date+time
[docs] def num2date(self, dt=None, tz=None, naive=True): """Convert internal numeric format to datetime. Args: dt: Numeric datetime value (uses current if None). tz: Timezone to use (uses feed tz if None). naive: Return naive datetime if True. Returns: datetime: Converted datetime object. """ if dt is None: return num2date(self.lines.datetime[0], tz or self._tz, naive) return num2date(dt, tz or self._tz, naive)
# Whether has live data; default is False; if has live data, needs override
[docs] def haslivedata(self): """Check if this data feed has live data. Returns: bool: False for base class, override for live data feeds. """ return False # must be overriden for those that can
# Wait interval when resampling live data
[docs] def do_qcheck(self, onoff, qlapse): """Calculate wait interval for queue checking. Args: onoff: Whether queue checking is enabled. qlapse: Time elapsed since last check. """ # if onoff is True, the data will wait p.qcheck for incoming live data # on its queue. if not onoff: self._qcheck = 0.0 return self._qcheck = max(0.0, self.p.qcheck - qlapse)
# Whether is live data; default is False; if True, cerebro will not use preload and runonce, because live data needs # to be fetched tick by tick or bar by bar
[docs] def islive(self): """If this returns True, ``Cerebro`` will deactivate ``preload`` and ``runonce`` because a live data source must be fetched tick by tick (or bar by bar)""" return False
# If latest status differs from current status, need to add info to notifs to update latest status
[docs] def put_notification(self, status, *args, **kwargs): """Add arguments to notification queue""" if self._laststatus != status: self.notifs.append((status, args, kwargs)) self._laststatus = status
# Get notification info, save to notifs and return as result
[docs] def get_notifications(self): """Return the pending "store" notifications""" # The background thread could keep on adding notifications. The None # mark allows to identify which is the last notification to deliver # Add a None, when None is retrieved, it means the queue is empty and all info has been retrieved self.notifs.append(None) # put a mark notifs = [] while True: notif = self.notifs.popleft() if notif is None: # mark is reached break notifs.append(notif) return notifs
# Get feed
[docs] def getfeed(self): """Get the parent feed object. Returns: FeedBase or None: The parent feed instance. """ return self._feed
# Amount of cached data
[docs] def qbuffer(self, savemem=0, replaying=False): """Apply queued buffering to all lines. Args: savemem: Memory saving mode. replaying: Whether replaying is active. """ extrasize = self.resampling or replaying for line in self.lines: line.qbuffer(savemem=savemem, extrasize=extrasize)
# Start, reset _barstack and _barstash
[docs] def start(self): """Start the data feed. Resets internal queues and sets initial status to CONNECTED. """ self._barstack = collections.deque() self._barstash = collections.deque() self._laststatus = self.CONNECTED
# End
[docs] def stop(self): """Stop the data feed. Override in subclasses for cleanup. """
# Clone data
[docs] def clone(self, **kwargs): """Create a clone of this data feed. Args: **kwargs: Additional keyword arguments for the clone. Returns: DataClone: A cloned data feed. """ return DataClone(dataname=self, **kwargs)
# Copy data and give it a different name
[docs] def copyas(self, _dataname, **kwargs): """Copy the data feed with a different name. Args: _dataname: New name for the data feed. **kwargs: Additional keyword arguments. Returns: DataClone: A cloned data feed with the new name. """ d = DataClone(dataname=self, **kwargs) d._dataname = _dataname d._name = _dataname return d
# Set environment
[docs] def setenvironment(self, env): """Keep a reference to the environment""" self._env = env
# Get environment
[docs] def getenvironment(self): """Get the cerebro environment reference. Returns: The cerebro environment instance. """ return self._env
# Add simple filter
[docs] def addfilter_simple(self, f, *args, **kwargs): """Add a simple filter wrapper to this data feed. Args: f: Filter function to apply. *args: Positional arguments for the filter. **kwargs: Keyword arguments for the filter. """ fp = SimpleFilterWrapper(self, f, *args, **kwargs) self._filters.append((fp, fp.args, fp.kwargs))
# Add filter
[docs] def addfilter(self, p, *args, **kwargs): """Add a filter to this data feed. Args: p: Filter class or instance. *args: Positional arguments for filter creation. **kwargs: Keyword arguments for filter creation. """ if inspect.isclass(p): pobj = p(self, *args, **kwargs) self._filters.append((pobj, [], {})) if hasattr(pobj, "last"): self._ffilters.append((pobj, [], {})) else: self._filters.append((p, args, kwargs))
# Compensate
[docs] def compensate(self, other): """Call it to let the broker know that actions on this asset will compensate open positions in another""" self._compensate = other
# Set tick_+name attribute to None for non-datetime names, mainly used when synthesizing low-frequency data from high-frequency data def _tick_nullify(self): # These are the updating prices in case the new bar is "updated" # and the length doesn't change like if a replay is happening or # a real-time data feed is in use and 1-minute bars are being # constructed with 5-second updates # PERFORMANCE OPTIMIZATION: Cache tick attribute names to avoid repeated string concat tick_cache = getattr(self, "_tick_cache", None) if tick_cache is None: tick_cache = ["tick_" + alias for alias in self.getlinealiases() if alias != "datetime"] self._tick_cache = tick_cache for tick_name in tick_cache: setattr(self, tick_name, None) self.tick_last = None # If tick_xxx related attribute value is None, need to consider using bar data to fill def _tick_fill(self, force=False): # If nothing filled the tick_xxx attributes, the bar is the tick # PERFORMANCE OPTIMIZATION: Cache tick name/line pairs for faster access tick_line_cache = getattr(self, "_tick_line_cache", None) if tick_line_cache is None: tick_line_cache = [] alias0 = self._getlinealias(0) self._tick_alias0 = "tick_" + alias0 self._line_alias0 = getattr(self.lines, alias0) for lalias in self.getlinealiases(): if lalias != "datetime": tick_line_cache.append(("tick_" + lalias, getattr(self.lines, lalias))) self._tick_line_cache = tick_line_cache if force or getattr(self, self._tick_alias0, None) is None: for tick_name, line in self._tick_line_cache: setattr(self, tick_name, line[0]) self.tick_last = self._line_alias0[0] # Get time of next bar # PERFORMANCE OPTIMIZATION: Cache float("inf") as module-level constant _INF = float("inf")
[docs] def advance_peek(self): """Peek at the datetime of the next bar. Returns: float: Numeric datetime of next bar, or inf if unavailable. """ # PERFORMANCE OPTIMIZATION: Use cached _INF, avoid repeated float("inf") creation _inf = self._INF try: if len(self) < self.buflen(): # CRITICAL FIX: Check if datetime[1] is valid before returning try: next_dt = self.lines.datetime[1] # If next_dt is 0 or invalid, return inf if next_dt is None or next_dt <= 0: return _inf return next_dt except (IndexError, KeyError): # If accessing datetime[1] fails, we're at the end return _inf return _inf # max date else except Exception as e: logger.debug("Exception in _gettz for %s: %s", getattr(self, "_name", ""), e) return _inf
# Move data forward by size
[docs] def advance(self, size=1, datamaster=None, ticks=True): """Advance the data feed by the specified size. Args: size: Number of bars to advance (default: 1). datamaster: Master data feed for synchronization. ticks: Whether to process tick data. """ if ticks: self._tick_nullify() # Need intercepting this call to support datas with # different lengths (timeframes) self.lines.advance(size) if datamaster is not None: if len(self) > self.buflen(): # if no bar can be delivered, fill with an empty bar self.rewind() self.lines.forward() return if self.lines.datetime[0] > datamaster.lines.datetime[0]: self.lines.rewind() else: if ticks: self._tick_fill() elif len(self) < self.buflen(): # a resampler may have advance us past the last point if ticks: self._tick_fill()
# What happens on data when next is called
[docs] def next(self, datamaster=None, ticks=True): """Move to the next bar. Args: datamaster: Master data feed for synchronization. ticks: Whether to process tick data. Returns: bool: True if a bar is available, False otherwise. """ # If data length is greater than cached data length, if it's ticks data, call _tick_nullify to generate tick_xxx attributes, then call load to try getting next bar; if ret is empty # return ret. If master data is None, if it's ticks data, need to call _tick_fill. # If own length is less than cached data length, move forward if len(self) >= self.buflen(): if ticks: self._tick_nullify() # not preloaded - request next bar ret = self.load() if not ret: # if the load cannot produce bars - forward the result return ret if datamaster is None: # bar is there and no master ... return load's result if ticks: self._tick_fill() return ret else: self.advance(ticks=ticks) # If master data is not None, if current data time is greater than master data time, need to adjust backward; # If current data time is not greater than master data time and data is ticks data, need to fill current data # If master data is None and data is ticks data, need to fill current day data # a bar is "loaded" or was preloaded - index has been moved to it if datamaster is not None: # there is a time reference to check against if self.lines.datetime[0] > datamaster.lines.datetime[0]: # can't deliver new bar, too early, go back self.rewind() return False if ticks: self._tick_fill() else: if ticks: self._tick_fill() # tell the world there is a bar (either the new or the previous # Indicate current bar exists return True
# Preload data
[docs] def preload(self): """Preload all available data from the data feed. Loads all bars and resets position to the beginning. """ # Load data while self.load(): pass self._last() self.home()
# Last chance to use filters def _last(self, datamaster=None): # A last chance for filters to deliver something ret = 0 for ff, fargs, fkwargs in self._ffilters: ret += ff.last(self, *fargs, **fkwargs) doticks = False if datamaster is not None and self._barstack: doticks = True while self._fromstack(forward=True): # consume bar(s) produced by "last"s - adding room pass if doticks: self._tick_fill() return bool(ret) # Check if verification is needed def _check(self, forcedata=None): for ff, fargs, fkwargs in self._filters: if not hasattr(ff, "check"): continue ff.check(self, _forcedata=forcedata, *fargs, **fkwargs) # Load data
[docs] def load(self): """Load the next bar from the data feed. Returns: bool: True if a bar was loaded, False if no more data. This method handles: - Forwarding the data pointer - Processing filters - Checking date boundaries """ while True: # move a data pointer forward for new bar # Move data pointer forward by one self.forward() # If data has been retrieved from self._barstack and saved to line, directly return True if self._fromstack(): # bar is available return True # If data cannot be retrieved from self._barstash, run the following code if not self._fromstack(stash=True): # _load() returns False, following code must run, but seems unnecessary to call this function or check following result, these two statements seem redundant ### Cannot be 100% certain for now, will review after code comments are completed #fix _loadret = self._load() if not _loadret: # no bar use force to make sure in exactbars # the pointer is undone this covers especially (but not # uniquely) the case in which the last bar has been seen # and a backwards would ruin pointer accounting in the # "stop" method of the strategy self.backwards(force=True) # undo data pointer # Return the actual returned value which may be None to # signal no bar is available, but the data feed is not # done. False means game over return _loadret # If bar was not retrieved from self._barstack but bar was retrieved from self._barstash, need to process bar # Get a reference to current loaded time # Get current time dt = self.lines.datetime[0] # A bar has been loaded, adapt the time # If timezone processing is needed for input time, convert number to time, localize time, convert time to number, update current time if self._tzinput: # Input has been converted at face value, but it's not UTC in # the input stream dtime = num2date(dt) # get it in a naive datetime # localize it dtime = self._tzinput.localize(dtime) # pytz compatible-ized self.lines.datetime[0] = dt = date2num(dtime) # keep UTC val # Check standard date from/to filters # If current time is less than start time, move backward to discard bar and continue if dt < self.fromdate: # discard loaded bar and carry on self.backwards() continue # If time is greater than end time, move backward and undo data pointer, then break if dt > self.todate: # discard loaded bar and break out self.backwards(force=True) break # Pass through filters # Iterate through each filter retff = False for ff, fargs, fkwargs in self._filters: # previous filter may have put things onto the stack # If self._barstack is not empty if self._barstack: # Perform self._barstack number of _fromstack function calls, call filter ff for i in range(len(self._barstack)): self._fromstack(forward=True) retff = ff(self, *fargs, **fkwargs) # If self._barstack is empty, call filter once else: retff = ff(self, *fargs, **fkwargs) # If retff is True, break out of filter loop if retff: # bar removed from systemn break # out of the inner loop # If True, continue if retff: # bar removed from system - loop to get new bar continue # in the greater loop # Checks let the bar through ... notify it return True # End loop, return False, no more bars or reached end date # Out of the loop ... no more bars or past todate return False
# Function that returns False def _load(self): return False # Add bar data to self._barstack or self._barstash def _add2stack(self, bar, stash=False): """Saves given bar (list of values) to the stack for later retrieval""" if not stash: self._barstack.append(bar) else: self._barstash.append(bar) # Get bar data and save to self._barstack or self._barstash, provides parameter to delete bar def _save2stack(self, erase=False, force=False, stash=False): """Saves current bar to the bar stack for later retrieval Parameter ``erase`` determines removal from the data stream """ bar = [line[0] for line in self.itersize()] if not stash: self._barstack.append(bar) else: self._barstash.append(bar) if erase: # remove bar if requested self.backwards(force=force) # This comment has issues, this function is used to update bar data to specific lines def _updatebar(self, bar, forward=False, ago=0): """Load a value from the stack onto the lines to form the new bar Returns True if values are present, False otherwise """ if forward: self.forward() for line, val in zip(self.itersize(), bar): line[0 + ago] = val # Get data from self._barstack or self._barstash, then save to line; if successful return True, if not return False def _fromstack(self, forward=False, stash=False): """Load a value from the stack onto the lines to form the new bar Returns True if values are present, False otherwise """ # When stash is False, coll equals self._barstack, otherwise it's self._barstash coll = self._barstack if not stash else self._barstash # If coll has data if coll: # If forward is True, call forward if forward: self.forward() # Add data to line for line, val in zip(self.itersize(), coll.popleft()): line[0] = val self._tick_fill(force=True) return True return False # Add resample filter
[docs] def resample(self, **kwargs): """Add a resampling filter to this data feed. Resampling converts data to a different timeframe (e.g., minutes to days). Args: **kwargs: Arguments for the Resampler filter. """ self.addfilter(Resampler, **kwargs)
# Add replay filter
[docs] def replay(self, **kwargs): """Add a replay filter to this data feed. Replay filters process tick data into bars with precise control. Args: **kwargs: Arguments for the Replayer filter. """ self.addfilter(Replayer, **kwargs)
@classmethod def _gettuple(cls): """For compatibility, provide _gettuple method""" return cls._params_tuple if hasattr(cls, "_params_tuple") else cls.params
# DataBase class, directly inherits from abstract DataBase
[docs] class DataBase(AbstractDataBase): """Full-featured data feed class. Inherits all functionality from AbstractDataBase. This is the standard data feed class for most use cases. """
# Refactor: Remove MetaParams metaclass, use normal parameter processing
[docs] class FeedBase: """Base class for feed containers. Manages multiple data feeds and provides parameter processing without using metaclasses. """ # Parameter processing, originally merged parameters automatically via metaclass, now manual processing
[docs] def __init__(self, **kwargs): """Initialize the feed base. Args: **kwargs: Keyword arguments for parameters. """ # Manually set parameters, replacing original metaclass functionality self.p = self._create_params(**kwargs) self.datas = []
def _create_params(self, **kwargs): """Manually create parameter object, replacing metaclass parameter processing""" # Create a simple parameter object class Params: """Parameter container for FeedBase. Stores parameter values and provides access via _getitems. """ def _getitems(self): """Simulate original _getitems method. Returns: list: List of (attribute_name, value) tuples for non-private attributes. """ # OPTIMIZED: Use __dict__ instead of dir() for better performance items = [] for attr_name, value in self.__dict__.items(): if not attr_name.startswith("_") and not callable(value): items.append((attr_name, value)) return items params_obj = Params() # Get default parameters from DataBase if hasattr(DataBase, "params"): base_params = DataBase.params if isinstance(base_params, (tuple, list)): for param_tuple in base_params: if isinstance(param_tuple, (tuple, list)) and len(param_tuple) >= 2: param_name, param_default = param_tuple[0], param_tuple[1] setattr(params_obj, param_name, kwargs.get(param_name, param_default)) # Set other passed parameters for key, value in kwargs.items(): if not hasattr(params_obj, key): setattr(params_obj, key, value) return params_obj # Data start
[docs] def start(self): """Start all managed data feeds.""" for data in self.datas: data.start()
# Data end
[docs] def stop(self): """Stop all managed data feeds.""" for data in self.datas: data.stop()
# Get data based on dataname and add data to self.datas
[docs] def getdata(self, dataname, name=None, **kwargs): """Get or create a data feed and add it to the managed datas. Args: dataname: Data source identifier (filename, URL, etc.). name: Display name for the data feed. **kwargs: Additional parameters for the data feed. Returns: DataBase: The created or retrieved data feed instance. """ # Merge parameters final_kwargs = {} if hasattr(self.p, "_getitems"): for pname, pvalue in self.p._getitems(): final_kwargs[pname] = pvalue elif hasattr(self.p, "__dict__"): final_kwargs.update(self.p.__dict__) final_kwargs.update(kwargs) final_kwargs["dataname"] = dataname data = self._getdata(**final_kwargs) data._name = name self.datas.append(data) return data
def _getdata(self, dataname, **kwargs): # Set keyword arguments final_kwargs = {} if hasattr(self.p, "_getitems"): for pname, pvalue in self.p._getitems(): final_kwargs[pname] = pvalue elif hasattr(self.p, "__dict__"): final_kwargs.update(self.p.__dict__) final_kwargs.update(kwargs) final_kwargs["dataname"] = dataname return self.DataCls(**final_kwargs)
# Refactor: Remove MetaCSVDataBase metaclass, use normal initialization method
[docs] class CSVDataBase(DataBase): """ Base class for classes implementing CSV DataFeeds The class takes care of opening the file, reading the lines and tokenizing them. Subclasses do only need to override: - _loadline(tokens) The return value of ``_loadline`` (True/False) will be the return value of ``_load`` which has been overriden by this base class """ # Data defaults to None f = None # Set specific parameters, merge parent class parameters - use _params_tuple to save original definition _params_tuple = ( ("headers", True), ("separator", ","), ) # Keep original params definition for compatibility with metaclass system params = _params_tuple # Get data and simple processing def __init__(self, *args, **kwargs): """Initialize the CSV data base. Args: *args: Positional arguments. **kwargs: Keyword arguments for parameters. """ # Execute original metaclass MetaCSVDataBase.dopostinit functionality self._csv_postinit(**kwargs) # Call parent class initialization super().__init__(*args, **kwargs) self.separator = None def _csv_postinit(self, **kwargs): """Replace original MetaCSVDataBase.dopostinit""" # If parameter has no name and _name attribute is empty, get specific name from data file name # Use existing parameter system dataname = getattr(self, "p", None) and getattr(self.p, "dataname", None) if not dataname: dataname = kwargs.get("dataname", "") name = getattr(self, "p", None) and getattr(self.p, "name", None) if not name: name = kwargs.get("name", "") if not name and not getattr(self, "_name", ""): if isinstance(dataname, string_types): self._name, _ = os.path.splitext(os.path.basename(dataname))
[docs] def start(self): """Start the CSV data feed. Opens the CSV file and optionally skips headers. """ super().start() # If data is None if self.f is None: # If dataname parameter has readline attribute, it means dataname is a data source, directly set f to data in parameter if hasattr(self.p.dataname, "readline"): self.f = self.p.dataname # If no readline attribute, it means dataname is a path, open file based on path to get data else: # Let an exception propagate to let the caller know self.f = open(self.p.dataname) # If there are headers, read a line and skip headers if self.p.headers: self.f.readline() # skip the headers # Separator for each line of data self.separator = self.p.separator
# Stop
[docs] def stop(self): """Stop the CSV data feed. Closes the CSV file if open. """ super().stop() # If data file is not None, close file and set to None if self.f is not None: self.f.close() self.f = None
# Preload data
[docs] def preload(self): """Preload all data from the CSV file. Loads all available data and closes the file handle. """ # Load data while self.load(): pass # Settings after load is finished self._last() self.home() # preloaded - no need to keep the object around - breaks multip in 3.x # Close data file and set to None if self.f is not None: self.f.close() self.f = None
# Load a line of data def _load(self): # If data file is None, return False; if line cannot be read, return False; process line, call _loadline to load if self.f is None: return False # Let an exception propagate to let the caller know line = self.f.readline() if not line: return False line = line.rstrip("\n") linetokens = line.split(self.separator) return self._loadline(linetokens) # Get next line of data def _getnextline(self): # This function is very similar to previous one, just previous one gets linetokens with additional _loadline call if self.f is None: return None # Let an exception propagate to let the caller know line = self.f.readline() if not line: return None line = line.rstrip("\n") linetokens = line.split(self.separator) return linetokens
[docs] class CSVFeedBase(FeedBase): """Base class for CSV feed containers. Manages CSV data feeds with support for base path prefixing. """ # Set parameters
[docs] def __init__(self, basepath="", **kwargs): """Initialize the CSV feed base. Args: basepath: Base path to prepend to data file names. **kwargs: Additional keyword arguments for parameters. """ self.basepath = basepath # Merge CSVDataBase parameters csv_params = {} if hasattr(CSVDataBase, "params"): csv_base_params = CSVDataBase.params if isinstance(csv_base_params, (tuple, list)): for param_tuple in csv_base_params: if isinstance(param_tuple, (tuple, list)) and len(param_tuple) >= 2: param_name, param_default = param_tuple[0], param_tuple[1] csv_params[param_name] = kwargs.get(param_name, param_default) kwargs.update(csv_params) super().__init__(**kwargs)
# Get data def _getdata(self, dataname, **kwargs): final_kwargs = {} if hasattr(self.p, "_getitems"): for pname, pvalue in self.p._getitems(): final_kwargs[pname] = pvalue elif hasattr(self.p, "__dict__"): final_kwargs.update(self.p.__dict__) final_kwargs.update(kwargs) return self.DataCls(dataname=self.basepath + dataname, **final_kwargs)
# Data clone
[docs] class DataClone(AbstractDataBase): """Clones an existing data feed. Creates a new data feed that references an existing data feed. Useful for creating multiple views of the same data with different parameters or filters. """ # Set _clone attribute to True _clone = True # Initialize, data equals dataname parameter value, _datename equals data's _dataname attribute value # Then copy date, time, trading interval, compression parameters def __init__(self, *args, **kwargs): """Initialize the data clone. Args: *args: Positional arguments. **kwargs: Keyword arguments, must include 'dataname' (the source data feed). Raises: ValueError: If 'dataname' parameter is not provided. """ # CRITICAL FIX: Initialize these attributes BEFORE calling super().__init__ # to ensure they exist when parent class methods access them self._dlen = 0 self._preloading = None # Get dataname and set it as self.data dataname = kwargs.get("dataname") if dataname is None and hasattr(self, "p"): dataname = getattr(self.p, "dataname", None) if dataname is None: raise ValueError("DataClone requires 'dataname' parameter") # CRITICAL FIX: Store data reference using object.__setattr__ to bypass # any custom __setattr__ that might interfere object.__setattr__(self, "data", dataname) self._dataname = getattr(self.data, "_dataname", None) # Copy date/session parameters from source data if hasattr(self.data, "p"): kwargs.setdefault("fromdate", getattr(self.data.p, "fromdate", None)) kwargs.setdefault("todate", getattr(self.data.p, "todate", None)) kwargs.setdefault("sessionstart", getattr(self.data.p, "sessionstart", None)) kwargs.setdefault("sessionend", getattr(self.data.p, "sessionend", None)) kwargs.setdefault("timeframe", getattr(self.data.p, "timeframe", None)) kwargs.setdefault("compression", getattr(self.data.p, "compression", None)) super().__init__(*args, **kwargs) # CRITICAL FIX: Ensure self.data is still set after parent init # Re-set it to be safe, in case parent class __init__ cleared attributes if not hasattr(self, "data") or object.__getattribute__(self, "data") is None: object.__setattr__(self, "data", dataname) def _start(self): # redefine to copy data bits from guest data self.start() # Copy tz infos if hasattr(self.data, "_tz"): self._tz = self.data._tz self.lines.datetime._settz(self._tz) if hasattr(self.data, "_calendar"): self._calendar = self.data._calendar # guest data have already converted input self._tzinput = None # no need to further converr # Copy dates/session infos if hasattr(self.data, "fromdate"): self.fromdate = self.data.fromdate if hasattr(self.data, "todate"): self.todate = self.data.todate if hasattr(self.data, "sessionstart"): self.sessionstart = self.data.sessionstart if hasattr(self.data, "sessionend"): self.sessionend = self.data.sessionend # Start
[docs] def start(self): """Start the data clone. Initializes internal tracking variables. """ super().start() self._dlen = 0 self._preloading = False
# Preload data
[docs] def preload(self): """Preload data from the source data feed. After preloading, resets the source data's position. """ self._preloading = True super().preload() if hasattr(self.data, "home"): self.data.home() # preloading data was pushed forward self._preloading = False
# Load data def _load(self): """Load data from the source data feed. Returns: bool: True if data was loaded, False otherwise. """ # assumption: the data is in the system # copy the lines # If preparing to preload, run following code to copy specific data bit by bit if self._preloading: # data is preloaded, we are preloading too, can move # forward until have full bar or a data source is exhausted # Move data forward if hasattr(self.data, "advance"): self.data.advance() # If current data is greater than data buffer length, return False if len(self.data) > self.data.buflen(): return False # If current data length is not greater than buffered data length, set line data to dline data for line, dline in zip(self.lines, self.data.lines): line[0] = dline[0] # Return True after successful setting return True # Not preloading # This syntax is not very efficient, changing to len(self.data)<=self._dlen might save one comparison if len(self.data) <= self._dlen: # if not (len(self.data) > self._dlen): # backtrader built-in # Data not beyond last seen bar return False # Increase data length by 1 self._dlen += 1 # Set line data to dline data for line, dline in zip(self.lines, self.data.lines): line[0] = dline[0] return True # Move forward by size
[docs] def advance(self, size=1, datamaster=None, ticks=True): """Advance the data clone by the specified size. Args: size: Number of bars to advance. datamaster: Master data feed for synchronization (unused). ticks: Whether to process tick data. """ self._dlen += size super().advance(size, datamaster, ticks=ticks)