Source code for py_alpaca_api.stock.latest_quote

import json
from concurrent.futures import ThreadPoolExecutor, as_completed

from py_alpaca_api.http.requests import Requests
from py_alpaca_api.models.quote_model import QuoteModel, quote_class_from_dict


[docs] class LatestQuote:
[docs] BATCH_SIZE = 200 # Alpaca API limit for multi-symbol requests
def __init__(self, headers: dict[str, str]) -> None: self.headers = headers
[docs] def get( self, symbol: list[str] | str | None, feed: str = "iex", currency: str = "USD", ) -> list[QuoteModel] | QuoteModel: """Get latest quotes for one or more symbols. Args: symbol: A string or list of strings representing the stock symbol(s). feed: The data feed source. Default is "iex". currency: The currency for the quotes. Default is "USD". Returns: A single QuoteModel or list of QuoteModel objects. Raises: ValueError: If symbol is None/empty or if feed is invalid. """ if symbol is None or symbol == "": raise ValueError("Symbol is required. Must be a string or list of strings.") valid_feeds = ["iex", "sip", "otc"] if feed not in valid_feeds: raise ValueError("Invalid feed, must be one of: 'iex', 'sip', 'otc'") # Handle single vs multiple symbols is_single = isinstance(symbol, str) if is_single: assert isinstance(symbol, str) # Type guard for mypy symbols = [symbol.upper().strip()] else: assert isinstance(symbol, list) # Type guard for mypy symbols = [s.upper().strip() for s in symbol] # If more than BATCH_SIZE symbols, need to batch the requests if len(symbols) > self.BATCH_SIZE: quotes = self._get_batched_quotes(symbols, feed, currency) else: quotes = self._fetch_quotes(symbols, feed, currency) # Return single quote if single symbol requested if is_single and quotes: return quotes[0] return quotes
def _fetch_quotes( self, symbols: list[str], feed: str, currency: str ) -> list[QuoteModel]: """Fetch quotes for a list of symbols. Args: symbols: List of stock symbols. feed: The data feed source. currency: The currency for the quotes. Returns: List of QuoteModel objects. """ url = "https://data.alpaca.markets/v2/stocks/quotes/latest" symbols_str = ",".join(symbols) params: dict[str, str | bool | float | int] = { "symbols": symbols_str, "feed": feed, "currency": currency, } response = json.loads( Requests() .request(method="GET", url=url, headers=self.headers, params=params) .text ) quotes = [] for key, value in response.get("quotes", {}).items(): quotes.append( quote_class_from_dict( { "symbol": key, "timestamp": value["t"], "ask": value["ap"], "ask_size": value["as"], "bid": value["bp"], "bid_size": value["bs"], } ) ) return quotes def _get_batched_quotes( self, symbols: list[str], feed: str, currency: str ) -> list[QuoteModel]: """Handle large symbol lists by batching requests. Args: symbols: List of stock symbols. feed: The data feed source. currency: The currency for the quotes. Returns: List of QuoteModel objects. """ # Split symbols into batches batches = [ symbols[i : i + self.BATCH_SIZE] for i in range(0, len(symbols), self.BATCH_SIZE) ] # Use ThreadPoolExecutor for concurrent batch requests all_quotes = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for batch in batches: future = executor.submit(self._fetch_quotes, batch, feed, currency) futures.append(future) for future in as_completed(futures): try: quotes = future.result() all_quotes.extend(quotes) except Exception as e: # Log error but continue with other batches print(f"Error fetching batch: {e}") return all_quotes