Source code for py_alpaca_api.stock.history

import json
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd

from py_alpaca_api.http.requests import Requests
from py_alpaca_api.models.asset_model import AssetModel
from py_alpaca_api.stock.assets import Assets


[docs] class History:
[docs] BATCH_SIZE = 200 # Alpaca API limit for multi-symbol requests
def __init__(self, data_url: str, headers: dict[str, str], asset: Assets) -> None: """Initializes an instance of the History class. Args: data_url: A string representing the URL of the data. headers: A dictionary containing the headers to be included in the request. asset: An instance of the Asset class representing the asset. """ self.data_url = data_url self.headers = headers self.asset = asset ########################################### # /////// Check if Asset is Stock \\\\\\\ # ###########################################
[docs] def check_if_stock(self, symbol: str) -> AssetModel: """Check if the asset corresponding to the symbol is a stock. Args: symbol (str): The symbol of the asset to be checked. Returns: AssetModel: The asset information for the given symbol. Raises: ValueError: If there is an error getting the asset information or if the asset is not a stock. """ try: asset = self.asset.get(symbol) except Exception as e: raise ValueError(str(e)) from e if asset.asset_class != "us_equity": raise ValueError(f"{symbol} is not a stock.") return asset
########################################### # ////// Get Stock Historical Data \\\\\\ # ###########################################
[docs] def get_stock_data( self, symbol: str | list[str], start: str, end: str, timeframe: str = "1d", feed: str = "sip", currency: str = "USD", limit: int = 1000, sort: str = "asc", adjustment: str = "raw", ) -> pd.DataFrame: """Retrieves historical stock data for one or more symbols within a specified date range and timeframe. Args: symbol: The stock symbol(s) to fetch data for. Can be a single symbol string or list of symbols. start: The start date for historical data in the format "YYYY-MM-DD". end: The end date for historical data in the format "YYYY-MM-DD". timeframe: The timeframe for the historical data. Default is "1d". feed: The data feed source. Default is "sip". currency: The currency for historical data. Default is "USD". limit: The number of data points to fetch per symbol. Default is 1000. sort: The sort order for the data. Default is "asc". adjustment: The adjustment for historical data. Default is "raw". Returns: A pandas DataFrame containing the historical stock data for the given symbol(s) and time range. Raises: ValueError: If the given timeframe is not one of the allowed values. """ # Handle single symbol or list of symbols is_single = isinstance(symbol, str) if is_single: assert isinstance(symbol, str) # Type guard for mypy symbols_list: list[str] = [symbol] single_symbol: str = symbol else: assert isinstance(symbol, list) # Type guard for mypy symbols_list = symbol single_symbol = "" # Won't be used in multi-symbol case # Validate symbols are stocks for sym in symbols_list: self.check_if_stock(sym) # If more than BATCH_SIZE symbols, need to batch the requests if not is_single and len(symbols_list) > self.BATCH_SIZE: return self._get_batched_stock_data( symbols_list, start, end, timeframe, feed, currency, limit, sort, adjustment, ) # Determine if using single or multi-symbol endpoint if is_single: url = f"{self.data_url}/stocks/{single_symbol}/bars" else: url = f"{self.data_url}/stocks/bars" timeframe_mapping: dict = { "1m": "1Min", "5m": "5Min", "15m": "15Min", "30m": "30Min", "1h": "1Hour", "4h": "4Hour", "1d": "1Day", "1w": "1Week", "1M": "1Month", } if timeframe not in timeframe_mapping: raise ValueError( 'Invalid timeframe. Must be "1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w", or "1M"' ) params: dict = { "timeframe": timeframe_mapping[timeframe], "start": start, "end": end, "currency": currency, "limit": limit, "adjustment": adjustment, "feed": feed, "sort": sort, } # Add symbols parameter for multi-symbol request if not is_single: params["symbols"] = ",".join(symbols_list) symbol_data = self.get_historical_data(symbols_list, url, params, is_single) # Process data based on single or multi-symbol if is_single: return self.preprocess_data(symbol_data[single_symbol], single_symbol) return self.preprocess_multi_data(symbol_data)
def _get_batched_stock_data( self, symbols: list[str], start: str, end: str, timeframe: str, feed: str, currency: str, limit: int, sort: str, adjustment: str, ) -> pd.DataFrame: """Handle large symbol lists by batching requests. Args: symbols: List of symbols to fetch data for. start: The start date for historical data. end: The end date for historical data. timeframe: The timeframe for the historical data. feed: The data feed source. currency: The currency for historical data. limit: The number of data points to fetch per symbol. sort: The sort order for the data. adjustment: The adjustment for historical data. Returns: A pandas DataFrame containing the historical stock data for all symbols. """ # Split symbols into batches batches = [ symbols[i : i + self.BATCH_SIZE] for i in range(0, len(symbols), self.BATCH_SIZE) ] # Use ThreadPoolExecutor for concurrent batch requests all_dfs = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for batch in batches: future = executor.submit( self.get_stock_data, batch, start, end, timeframe, feed, currency, limit, sort, adjustment, ) futures.append(future) for future in as_completed(futures): try: df = future.result() if not df.empty: all_dfs.append(df) except Exception as e: # Log error but continue with other batches print(f"Error fetching batch: {e}") if all_dfs: return pd.concat(all_dfs, ignore_index=True).sort_values(["symbol", "date"]) return pd.DataFrame() @staticmethod
[docs] def preprocess_multi_data( symbols_data: dict[str, list[defaultdict]], ) -> pd.DataFrame: """Preprocess data for multiple symbols. Args: symbols_data: A dictionary mapping symbols to their bar data. Returns: A pandas DataFrame containing the preprocessed historical stock data for all symbols. """ all_dfs = [] for symbol, data in symbols_data.items(): if data: # Only process if data exists df = History.preprocess_data(data, symbol) all_dfs.append(df) if all_dfs: return pd.concat(all_dfs, ignore_index=True).sort_values(["symbol", "date"]) return pd.DataFrame()
########################################### # /////////// PreProcess Data \\\\\\\\\\\ # ########################################### @staticmethod
[docs] def preprocess_data(symbol_data: list[defaultdict], symbol: str) -> pd.DataFrame: """Prepross data Preprocesses the given symbol data by converting it to a pandas DataFrame and performing various data transformations. Args: symbol_data: A list of defaultdict objects representing the JSON response data. symbol: A string representing the symbol or ticker for the stock data. Returns: A pandas DataFrame containing the preprocessed historical stock data. """ bar_data_df = pd.DataFrame(symbol_data) bar_data_df.insert(0, "symbol", symbol) bar_data_df["t"] = pd.to_datetime( bar_data_df["t"].replace("[A-Za-z]", " ", regex=True) ) bar_data_df.rename( columns={ "t": "date", "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume", "n": "trade_count", "vw": "vwap", }, inplace=True, ) return bar_data_df.astype( { "open": "float", "high": "float", "low": "float", "close": "float", "symbol": "str", "date": "datetime64[ns]", "vwap": "float", "trade_count": "int", "volume": "int", } )
########################################### # ///////// Get Historical Data \\\\\\\\\ # ###########################################
[docs] def get_historical_data( self, symbols: list[str], url: str, params: dict, is_single: bool ) -> dict[str, list[defaultdict]]: """Retrieves historical data for given symbol(s). Args: symbols: List of symbols for which to retrieve historical data. url: The URL to send the request to. params: Additional parameters to include in the request. is_single: Whether this is a single-symbol request. Returns: dict[str, list[defaultdict]]: A dictionary mapping symbols to their historical data. """ page_token: str | None = None symbols_data = defaultdict(list) while True: if page_token is not None: params["page_token"] = page_token response = json.loads( Requests() .request(method="GET", url=url, headers=self.headers, params=params) .text ) # Handle single vs multi-symbol response format if is_single: if not response.get("bars"): raise Exception( f"No historical data found for {symbols[0]}, with the given parameters." ) symbols_data[symbols[0]].extend(response.get("bars", [])) else: # Multi-symbol response has bars nested under symbol keys bars = response.get("bars", {}) if not bars: raise Exception( f"No historical data found for symbols: {', '.join(symbols)}, with the given parameters." ) for symbol, symbol_bars in bars.items(): symbols_data[symbol].extend(symbol_bars) page_token = response.get("next_page_token") if not page_token: break return symbols_data
########################################### # ///////// Get Latest Bars \\\\\\\\\ # ###########################################
[docs] def get_latest_bars( self, symbols: str | list[str], feed: str = "iex", currency: str = "USD", ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Get the latest bars for one or more symbols. The latest bars endpoint returns the most recent minute bar for each requested symbol. Args: symbols: Symbol(s) to get latest bars for. Can be a string for single symbol or list of strings for multiple symbols. feed: The data feed to use ("iex", "sip", or "otc"). Defaults to "iex". currency: The currency for the returned prices. Defaults to "USD". Returns: For single symbol: pd.DataFrame with the latest bar data. For multiple symbols: dict mapping symbols to DataFrames with latest bar data. Raises: ValueError: If feed is invalid or symbols is empty. Exception: If the API request fails or returns no data. """ # Validate feed valid_feeds = ["iex", "sip", "otc"] if feed not in valid_feeds: raise ValueError(f"Invalid feed. Must be one of: {', '.join(valid_feeds)}") # Normalize symbols to list is_single = isinstance(symbols, str) symbols_list: list[str] if is_single: assert isinstance(symbols, str) # Type narrowing for mypy symbols_list = [symbols.upper()] else: assert isinstance(symbols, list) # Type narrowing for mypy symbols_list = [s.upper() for s in symbols] if not symbols_list: raise ValueError("At least one symbol is required") # Check if all symbols are valid stocks for symbol in symbols_list: self.check_if_stock(symbol) # Build URL url = f"{self.data_url}/stocks/bars/latest" # Build parameters params: dict = { "symbols": ",".join(symbols_list), "feed": feed, "currency": currency, } # Make request response = json.loads( Requests() .request(method="GET", url=url, headers=self.headers, params=params) .text ) # Process response bars_data = response.get("bars", {}) if not bars_data: raise Exception( f"No latest bar data found for symbols: {', '.join(symbols_list)}" ) # Convert to DataFrames result = {} for symbol, bar_data in bars_data.items(): if bar_data: # Convert single bar to list for DataFrame df = pd.DataFrame([bar_data]) # Convert timestamp if "t" in df.columns: df["t"] = pd.to_datetime(df["t"]) df.rename(columns={"t": "timestamp"}, inplace=True) # Set timestamp as index if "timestamp" in df.columns: df.set_index("timestamp", inplace=True) # Rename columns to match existing pattern column_mapping = { "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume", "n": "trade_count", "vw": "vwap", } df.rename(columns=column_mapping, inplace=True) result[symbol] = df # Return single DataFrame for single symbol, dict for multiple if is_single and symbols_list[0] in result: return result[symbols_list[0]] return result