Skip to content

Simple Usage Examples

This document provides examples of common usage patterns for the Candles Feed package.

Basic REST API Example

The following example demonstrates how to fetch historical candle data using the REST API:

import asyncio
import time
from candles_feed.core.candles_feed import CandlesFeed

async def main():
    # Create a feed for Binance BTC-USDT with 1-hour candles
    feed = CandlesFeed(
        exchange="binance_spot",
        trading_pair="BTC-USDT",
        interval="1h",
        max_records=24  # Keep last 24 hours
    )

    # Calculate time range for the last 24 hours
    end_time = int(time.time())
    start_time = end_time - (24 * 60 * 60)  # 24 hours ago

    # Fetch historical candles
    candles = await feed.fetch_candles(start_time=start_time, end_time=end_time)
    print(f"Fetched {len(candles)} candles")

    # Print details of the first and last candle
    if candles:
        first_candle = candles[0]
        last_candle = candles[-1]

        print("\nFirst candle:")
        print(f"Timestamp: {first_candle.timestamp}")
        print(f"Open: {first_candle.open}")
        print(f"High: {first_candle.high}")
        print(f"Low: {first_candle.low}")
        print(f"Close: {first_candle.close}")
        print(f"Volume: {first_candle.volume}")

        print("\nLast candle:")
        print(f"Timestamp: {last_candle.timestamp}")
        print(f"Open: {last_candle.open}")
        print(f"High: {last_candle.high}")
        print(f"Low: {last_candle.low}")
        print(f"Close: {last_candle.close}")
        print(f"Volume: {last_candle.volume}")

    # Get data as pandas DataFrame
    df = feed.get_candles_df()
    print("\nDataFrame summary:")
    print(df.describe())

    # Clean up resources
    await feed.stop()

if __name__ == "__main__":
    asyncio.run(main())

WebSocket Streaming Example

This example shows how to stream real-time candle data using WebSockets:

import asyncio
import logging
from candles_feed.core.candles_feed import CandlesFeed

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def main():
    # Create a feed with 1-minute candles
    feed = CandlesFeed(
        exchange="binance_spot",
        trading_pair="BTC-USDT",
        interval="1m",
        max_records=60  # Keep last 60 minutes
    )

    # Start WebSocket streaming
    await feed.start(strategy="websocket")
    logger.info("WebSocket feed started")

    # Monitor for updates
    try:
        start_time = asyncio.get_event_loop().time()
        while asyncio.get_event_loop().time() - start_time < 60:  # Run for 1 minute
            candles = feed.get_candles()
            if candles:
                latest = candles[-1]
                logger.info(
                    f"Latest candle: timestamp={latest.timestamp}, "
                    f"open={latest.open}, high={latest.high}, "
                    f"low={latest.low}, close={latest.close}, "
                    f"volume={latest.volume}"
                )
            await asyncio.sleep(5)  # Check every 5 seconds
    finally:
        # Always clean up resources
        await feed.stop()
        logger.info("WebSocket feed stopped")

if __name__ == "__main__":
    asyncio.run(main())

Multi-Exchange Example

This example demonstrates how to work with multiple exchanges simultaneously:

import asyncio
import logging
from candles_feed.core.candles_feed import CandlesFeed
from candles_feed.core.exchange_registry import ExchangeRegistry

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def main():
    # List available exchange adapters
    ExchangeRegistry.discover_adapters()
    exchanges = ExchangeRegistry.list_available_adapters()
    logger.info(f"Available exchanges: {exchanges}")

    # Create feeds for different exchanges with the same trading pair
    feeds = []
    exchange_names = ["binance_spot", "kucoin_spot", "coinbase_advanced_trade"]

    for exchange in exchange_names:
        if exchange in exchanges:
            try:
                feed = CandlesFeed(
                    exchange=exchange,
                    trading_pair="BTC-USDT",  # Note: May be BTC-USD on some exchanges
                    interval="1m",
                    max_records=10
                )
                feeds.append((exchange, feed))
                logger.info(f"Created feed for {exchange}")
            except Exception as e:
                logger.error(f"Error creating feed for {exchange}: {e}")

    # Start all feeds
    start_tasks = []
    for exchange, feed in feeds:
        start_tasks.append(feed.start())

    await asyncio.gather(*start_tasks)
    logger.info("All feeds started")

    # Wait for data to arrive
    await asyncio.sleep(10)

    # Compare prices across exchanges
    logger.info("\nCurrent BTC prices across exchanges:")
    for exchange, feed in feeds:
        candles = feed.get_candles()
        if candles:
            latest = candles[-1]
            logger.info(f"{exchange}: close={latest.close}, volume={latest.volume}")
        else:
            logger.warning(f"{exchange}: No data received")

    # Stop all feeds
    stop_tasks = []
    for _, feed in feeds:
        stop_tasks.append(feed.stop())

    await asyncio.gather(*stop_tasks)
    logger.info("All feeds stopped")

if __name__ == "__main__":
    asyncio.run(main())

Data Analysis Example

This example shows how to perform basic analysis on candle data:

import asyncio
import pandas as pd
import numpy as np
from candles_feed.core.candles_feed import CandlesFeed

async def main():
    # Create a feed with hourly candles for the last week
    feed = CandlesFeed(
        exchange="binance_spot",
        trading_pair="BTC-USDT",
        interval="1h",
        max_records=168  # 24 hours * 7 days
    )

    # Fetch historical data
    await feed.fetch_candles()

    # Get data as DataFrame
    df = feed.get_candles_df()
    if df.empty:
        print("No data received")
        await feed.stop()
        return

    # Convert timestamp to datetime for better readability
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')

    # Calculate basic technical indicators
    # 1. Simple Moving Averages
    df['sma_20'] = df['close'].rolling(window=20).mean()
    df['sma_50'] = df['close'].rolling(window=50).mean()

    # 2. Exponential Moving Average
    df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean()

    # 3. Bollinger Bands
    window = 20
    df['middle_band'] = df['close'].rolling(window=window).mean()
    df['std'] = df['close'].rolling(window=window).std()
    df['upper_band'] = df['middle_band'] + 2 * df['std']
    df['lower_band'] = df['middle_band'] - 2 * df['std']

    # 4. Relative Strength Index (RSI)
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()

    rs = avg_gain / avg_loss
    df['rsi'] = 100 - (100 / (1 + rs))

    # 5. MACD
    df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean()
    df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean()
    df['macd'] = df['ema_12'] - df['ema_26']
    df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()
    df['macd_histogram'] = df['macd'] - df['signal']

    # Display analysis results
    print("\nAnalysis of BTC-USDT price data:")
    print(f"Period: {df['datetime'].iloc[0]} to {df['datetime'].iloc[-1]}")
    print(f"Number of candles: {len(df)}")
    print(f"Price range: {df['low'].min()} - {df['high'].max()}")
    print(f"Average volume: {df['volume'].mean():.2f}")

    print("\nLatest indicators:")
    latest = df.iloc[-1]
    print(f"Price: {latest['close']}")
    print(f"SMA (20): {latest['sma_20']}")
    print(f"EMA (20): {latest['ema_20']}")
    print(f"Bollinger Bands: {latest['lower_band']:.2f} - {latest['middle_band']:.2f} - {latest['upper_band']:.2f}")
    print(f"RSI: {latest['rsi']:.2f}")
    print(f"MACD: {latest['macd']:.2f}, Signal: {latest['signal']:.2f}, Histogram: {latest['macd_histogram']:.2f}")

    # Clean up
    await feed.stop()

if __name__ == "__main__":
    asyncio.run(main())

Error Handling Example

This example demonstrates robust error handling:

import asyncio
import logging
import time
from candles_feed.core.candles_feed import CandlesFeed
from candles_feed.core.exchange_registry import ExchangeRegistry

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def fetch_with_retry(exchange, trading_pair, interval, retries=3):
    """Fetch candle data with retry logic."""
    feed = None
    try:
        feed = CandlesFeed(
            exchange=exchange,
            trading_pair=trading_pair,
            interval=interval,
            max_records=100
        )

        # Try to fetch data with retries
        attempt = 0
        success = False

        while attempt < retries and not success:
            attempt += 1
            try:
                logger.info(f"Attempt {attempt} to fetch candles for {trading_pair} on {exchange}")
                candles = await feed.fetch_candles()

                if candles and len(candles) > 0:
                    logger.info(f"Successfully fetched {len(candles)} candles")
                    success = True
                    return candles
                else:
                    logger.warning(f"No candles received on attempt {attempt}")
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
            except Exception as e:
                logger.error(f"Error fetching candles (attempt {attempt}): {e}")
                if attempt < retries:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff

        if not success:
            logger.error(f"Failed to fetch candles after {retries} attempts")
            return []

    except Exception as e:
        logger.error(f"Error creating feed: {e}")
        return []
    finally:
        if feed:
            await feed.stop()

async def main():
    # Get available exchanges
    try:
        ExchangeRegistry.discover_adapters()
        exchanges = ExchangeRegistry.list_available_adapters()
        logger.info(f"Available exchanges: {exchanges}")
    except Exception as e:
        logger.error(f"Error discovering adapters: {e}")
        return

    # Fetch data from multiple exchanges with error handling
    results = {}
    tasks = []

    for exchange in ["binance_spot", "kraken_spot", "non_existent_exchange"]:
        if exchange in exchanges or exchange == "non_existent_exchange":
            task = asyncio.create_task(
                fetch_with_retry(
                    exchange=exchange,
                    trading_pair="BTC-USDT",
                    interval="1h"
                )
            )
            tasks.append((exchange, task))

    # Wait for all tasks to complete
    for exchange, task in tasks:
        try:
            candles = await task
            results[exchange] = candles
        except Exception as e:
            logger.error(f"Error in task for {exchange}: {e}")
            results[exchange] = []

    # Display results
    for exchange, candles in results.items():
        if candles:
            logger.info(f"{exchange}: Successfully fetched {len(candles)} candles")
        else:
            logger.warning(f"{exchange}: No candles fetched")

if __name__ == "__main__":
    asyncio.run(main())

Real-time Trading Signal Example

This example demonstrates how to generate simple trading signals from real-time data:

import asyncio
import logging
from candles_feed.core.candles_feed import CandlesFeed

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SimpleStrategy:
    """A simple trading strategy using moving averages."""

    def __init__(self, short_window=5, long_window=20):
        self.short_window = short_window
        self.long_window = long_window
        self.positions = []  # Track positions: 1 for long, -1 for short, 0 for neutral

    def calculate_signals(self, df):
        """Calculate trading signals based on moving average crossover."""
        if len(df) < self.long_window:
            return "INSUFFICIENT_DATA"

        # Calculate moving averages
        df['short_ma'] = df['close'].rolling(window=self.short_window).mean()
        df['long_ma'] = df['close'].rolling(window=self.long_window).mean()

        # Current position
        current_position = 0
        if len(self.positions) > 0:
            current_position = self.positions[-1]

        # Get the last two rows to check for a crossover
        if len(df) >= 2:
            prev_row = df.iloc[-2]
            curr_row = df.iloc[-1]

            # Check for crossing conditions
            prev_crossing = prev_row['short_ma'] > prev_row['long_ma']
            curr_crossing = curr_row['short_ma'] > curr_row['long_ma']

            # Generate signals
            if not prev_crossing and curr_crossing:  # Bullish crossover
                if current_position <= 0:
                    self.positions.append(1)
                    return "BUY"
            elif prev_crossing and not curr_crossing:  # Bearish crossover
                if current_position >= 0:
                    self.positions.append(-1)
                    return "SELL"

        self.positions.append(current_position)
        return "HOLD"

async def main():
    # Create a feed for real-time data
    feed = CandlesFeed(
        exchange="binance_spot",
        trading_pair="BTC-USDT",
        interval="1m",
        max_records=30  # Keep enough for our strategy
    )

    # Initialize our strategy
    strategy = SimpleStrategy(short_window=5, long_window=20)

    # Start the feed
    await feed.start(strategy="websocket")
    logger.info("Started real-time feed")

    try:
        # Run for a limited time (5 minutes)
        start_time = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start_time < 300:
            # Get latest data
            df = feed.get_candles_df()

            if not df.empty and len(df) >= strategy.long_window:
                # Calculate signals
                signal = strategy.calculate_signals(df)

                # Log signal and current price
                latest_price = df.iloc[-1]['close']
                logger.info(f"Signal: {signal}, Price: {latest_price}")

                if signal == "BUY":
                    logger.info(f"BUY SIGNAL at {latest_price}")
                elif signal == "SELL":
                    logger.info(f"SELL SIGNAL at {latest_price}")

            # Wait for next update
            await asyncio.sleep(10)

    finally:
        # Clean up
        await feed.stop()
        logger.info("Stopped feed")

if __name__ == "__main__":
    asyncio.run(main())

These examples demonstrate the core functionality of the Candles Feed package. You can adapt and extend them based on your specific requirements.