Source code for py_alpaca_api.stock.auctions

"""Historical auctions functionality for Alpaca Market Data API."""

import json
from collections import defaultdict
from datetime import datetime

import pandas as pd

from py_alpaca_api.exceptions import ValidationError
from py_alpaca_api.http.requests import Requests


[docs] class Auctions: """Handles historical auction data retrieval from Alpaca API.""" def __init__(self, headers: dict[str, str]) -> None: """Initialize the Auctions class. Args: headers: Dictionary containing authentication headers. """ self.headers = headers self.base_url = "https://data.alpaca.markets/v2/stocks"
[docs] def get_auctions( self, symbols: str | list[str], start: str, end: str, limit: int = 10000, asof: str | None = None, feed: str = "iex", page_token: str | None = None, sort: str = "asc", ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Get historical auction data for one or more symbols. Retrieves auction prices (opening and closing auctions) between specified dates. Args: symbols: Symbol(s) to get auction data for. Can be a string for single symbol or list of strings for multiple symbols. start: Start date/time in ISO 8601 format (e.g., "2021-01-01" or "2021-01-01T00:00:00Z"). end: End date/time in ISO 8601 format. limit: Maximum number of auctions to return per symbol. Defaults to 10000. asof: As-of date for corporate actions adjustments in YYYY-MM-DD format. feed: The data feed to use ("iex", "sip", or "otc"). Defaults to "iex". page_token: Pagination token from previous request. sort: Sort order for results ("asc" or "desc"). Defaults to "asc". Returns: For single symbol: pd.DataFrame with auction data. For multiple symbols: dict mapping symbols to DataFrames with auction data. Raises: ValidationError: If parameters are invalid. Exception: If the API request fails or returns no data. """ # Validate parameters self._validate_parameters(symbols, start, end, limit, feed, sort) # Normalize symbols to list is_single = isinstance(symbols, str) symbols_list: list[str] if is_single: assert isinstance(symbols, str) # Type narrowing for mypy symbols_list = [symbols.upper()] else: assert isinstance(symbols, list) # Type narrowing for mypy symbols_list = [s.upper() for s in symbols] # Determine endpoint URL if is_single: url = f"{self.base_url}/{symbols_list[0]}/auctions" else: url = f"{self.base_url}/auctions" # Build parameters params: dict[str, str | int] = { "start": start, "end": end, "limit": limit, "feed": feed, "sort": sort, } # Add optional parameters if asof: params["asof"] = asof if page_token: params["page_token"] = page_token if not is_single: params["symbols"] = ",".join(symbols_list) # Fetch all data with pagination all_auctions = self._fetch_paginated_auctions( url, params, symbols_list, is_single ) # Convert to DataFrames result = self._convert_to_dataframes(all_auctions) # Return single DataFrame for single symbol, dict for multiple if is_single and symbols_list[0] in result: return result[symbols_list[0]] return result
[docs] def get_daily_auctions( self, symbols: str | list[str], start: str, end: str, feed: str = "iex", ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Get daily auction summary for one or more symbols. Retrieves opening and closing auction prices aggregated by day. Args: symbols: Symbol(s) to get auction data for. start: Start date in YYYY-MM-DD format. end: End date in YYYY-MM-DD format. feed: The data feed to use. Defaults to "iex". Returns: DataFrame or dict of DataFrames with daily auction summaries. """ # Get all auction data auctions_data = self.get_auctions(symbols, start, end, feed=feed) # Process based on single or multiple symbols is_single = isinstance(symbols, str) if is_single: # auctions_data is a DataFrame for single symbol if isinstance(auctions_data, pd.DataFrame): return self._aggregate_daily_auctions(auctions_data) return pd.DataFrame() # Return empty if no data # auctions_data is a dict for multiple symbols result = {} if isinstance(auctions_data, dict): for symbol, df in auctions_data.items(): if isinstance(df, pd.DataFrame): result[symbol] = self._aggregate_daily_auctions(df) return result
def _validate_parameters( self, symbols: str | list[str], start: str, end: str, limit: int, feed: str, sort: str, ) -> None: """Validate input parameters. Args: symbols: Symbol(s) to validate. start: Start date/time to validate. end: End date/time to validate. limit: Limit to validate. feed: Feed to validate. sort: Sort order to validate. Raises: ValidationError: If any parameter is invalid. """ if not symbols: raise ValidationError("At least one symbol is required") valid_feeds = ["iex", "sip", "otc"] if feed not in valid_feeds: raise ValidationError( f"Invalid feed. Must be one of: {', '.join(valid_feeds)}" ) valid_sorts = ["asc", "desc"] if sort not in valid_sorts: raise ValidationError( f"Invalid sort. Must be one of: {', '.join(valid_sorts)}" ) if limit < 1: raise ValidationError("Limit must be at least 1") # Validate date format try: datetime.fromisoformat(start.replace("Z", "+00:00")) datetime.fromisoformat(end.replace("Z", "+00:00")) except ValueError as e: raise ValidationError(f"Invalid date format: {e}") from e def _fetch_paginated_auctions( self, url: str, params: dict, symbols_list: list[str], is_single: bool, ) -> dict[str, list[dict]]: """Fetch auction data with pagination support. Args: url: API endpoint URL. params: Request parameters. symbols_list: List of symbols being requested. is_single: Whether this is a single-symbol request. Returns: Dictionary mapping symbols to lists of auction dictionaries. Raises: Exception: If the API request fails or returns no data. """ all_auctions = defaultdict(list) page_token = params.get("page_token") while True: if page_token: params["page_token"] = page_token response = json.loads( Requests() .request(method="GET", url=url, headers=self.headers, params=params) .text ) # Handle single vs multi-symbol response format if is_single: auctions_data = response.get("auctions", []) if auctions_data: all_auctions[symbols_list[0]].extend(auctions_data) else: # Multi-symbol response has auctions nested under symbol keys auctions_data = response.get("auctions", {}) for symbol, symbol_auctions in auctions_data.items(): all_auctions[symbol].extend(symbol_auctions) # Check for next page page_token = response.get("next_page_token") if not page_token: break # Remove page_token from params if it was there params.pop("page_token", None) if not all_auctions: raise Exception( f"No auction data found for symbols: {', '.join(symbols_list)}" ) return all_auctions def _convert_to_dataframes( self, auctions_data: dict[str, list[dict]] ) -> dict[str, pd.DataFrame]: """Convert auction data to pandas DataFrames. Args: auctions_data: Dictionary mapping symbols to lists of auction dictionaries. Returns: Dictionary mapping symbols to DataFrames with auction data. """ result = {} for symbol, auctions in auctions_data.items(): if auctions: df = pd.DataFrame(auctions) # Convert timestamp if "t" in df.columns: df["t"] = pd.to_datetime(df["t"]) df.rename(columns={"t": "timestamp"}, inplace=True) # Set timestamp as index if "timestamp" in df.columns: df.set_index("timestamp", inplace=True) # Rename columns to be more descriptive column_mapping = { "d": "date", "o": "opening_auction", "op": "opening_price", "oh": "opening_high", "ol": "opening_low", "ov": "opening_volume", "c": "closing_auction", "cp": "closing_price", "ch": "closing_high", "cl": "closing_low", "cv": "closing_volume", "x": "exchange", } df.rename(columns=column_mapping, inplace=True) # Add derived metrics if "opening_price" in df.columns and "closing_price" in df.columns: df["intraday_return"] = ( (df["closing_price"] - df["opening_price"]) / df["opening_price"] * 100 ).round(4) result[symbol] = df return result def _aggregate_daily_auctions(self, df: pd.DataFrame) -> pd.DataFrame: """Aggregate auction data by day. Args: df: DataFrame with auction data. Returns: DataFrame with daily aggregated auction data. """ if df.empty: return df # Ensure we have a datetime index if not isinstance(df.index, pd.DatetimeIndex) and "timestamp" in df.columns: df.set_index("timestamp", inplace=True) # Group by date - use date property for datetime index if isinstance(df.index, pd.DatetimeIndex): # Group by date (removing time component) daily = df.groupby(pd.Grouper(freq="D")).agg( { col: "first" if "opening" in col else "last" for col in df.columns if col in ["opening_price", "closing_price"] } ) # Remove empty rows daily = daily.dropna(how="all") # Calculate daily metrics if "opening_price" in daily.columns and "closing_price" in daily.columns: daily["daily_return"] = ( (daily["closing_price"] - daily["opening_price"]) / daily["opening_price"] * 100 ).round(4) # Ensure we return a DataFrame assert isinstance(daily, pd.DataFrame) return daily return df