Mock Server Example¶
This example demonstrates how to use the mock server functionality for testing the Candles Feed framework without connecting to real exchanges. This is particularly useful for development, testing, and CI/CD pipelines.
Basic Server Setup¶
import asyncio
import logging
from candles_feed.testing_resources.mocked_candle_feed_server import MockedCandlesFeedServer
from candles_feed.testing_resources.mocks.core.exchange_type import ExchangeType
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
"""Example of setting up and using a mock exchange server."""
# Create a mock server for Binance Spot
server = MockedCandlesFeedServer(
exchange_type=ExchangeType.BINANCE_SPOT,
host="127.0.0.1",
port=8080
)
# Configure custom trading pairs
custom_pairs = [
("BTCUSDT", "1m", 50000.0), # BTC with initial price of $50,000
("ETHUSDT", "1m", 3000.0), # ETH with initial price of $3,000
("SOLUSDT", "1m", 100.0), # SOL with initial price of $100
("BTCUSDT", "5m", 50000.0), # BTC with 5m interval
("BTCUSDT", "1h", 50000.0), # BTC with 1h interval
]
try:
# Start the server with our custom pairs
await server.start(trading_pairs=custom_pairs)
logger.info(f"Mock exchange server started at {server.url}")
# Now you can use the server for testing
# ...
# Let it run for a while to see the logs
await asyncio.sleep(5)
finally:
# Always stop the server when done
await server.stop()
logger.info("Server stopped")
if __name__ == "__main__":
asyncio.run(main())
Using the Mock Server with CandlesFeed¶
This example shows how to connect CandlesFeed to the mock server:
import asyncio
import logging
from candles_feed.core.candles_feed import CandlesFeed
from candles_feed.testing_resources.mocked_candle_feed_server import MockedCandlesFeedServer
from candles_feed.testing_resources.mocks.core.exchange_type import ExchangeType
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
"""Example of using CandlesFeed with a mock server."""
# Create and start a mock server
server = MockedCandlesFeedServer(
exchange_type=ExchangeType.BINANCE_SPOT,
host="127.0.0.1",
port=8080
)
# Start with standard trading pairs
await server.start()
logger.info(f"Mock server started at {server.url}")
# Create a CandlesFeed instance
feed = CandlesFeed(
exchange="binance_spot",
trading_pair="BTC-USDT",
interval="1m",
max_records=100
)
# Override the adapter's URL methods to point to our mock server
feed._adapter.get_rest_url = lambda: f"{server.url}/api/v3/klines"
feed._adapter.get_ws_url = lambda: f"ws://{server.host}:{server.port}/ws"
try:
# Try both REST and WebSocket strategies
# 1. First, use REST polling
logger.info("Testing REST polling strategy...")
await feed.start(strategy="polling")
# Wait a bit for data to accumulate
await asyncio.sleep(2)
# Check the results
candles = feed.get_candles()
logger.info(f"Received {len(candles)} candles via REST")
if candles:
logger.info(f"Latest BTC price: {candles[-1].close}")
# Stop the feed
await feed.stop()
# 2. Now, use WebSocket
logger.info("\nTesting WebSocket strategy...")
await feed.start(strategy="websocket")
# Wait for some updates
for i in range(5):
await asyncio.sleep(1)
candles = feed.get_candles()
if candles:
logger.info(f"[WS Update {i+1}] BTC price: {candles[-1].close}")
finally:
# Clean up
await feed.stop()
await server.stop()
logger.info("Test completed")
if __name__ == "__main__":
asyncio.run(main())
Testing Network Conditions¶
This example demonstrates how to simulate different network conditions to test error handling:
import asyncio
import logging
from candles_feed.core.candles_feed import CandlesFeed
from candles_feed.testing_resources.mocked_candle_feed_server import MockedCandlesFeedServer
from candles_feed.testing_resources.mocks.core.exchange_type import ExchangeType
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
"""Example of testing with difficult network conditions."""
# Create and start a mock server
server = MockedCandlesFeedServer(
exchange_type=ExchangeType.BINANCE_SPOT,
host="127.0.0.1",
port=8080
)
await server.start()
logger.info(f"Mock server started at {server.url}")
# Create a CandlesFeed instance
feed = CandlesFeed(
exchange="binance_spot",
trading_pair="BTC-USDT",
interval="1m",
max_records=100
)
# Override the adapter's URL methods
feed._adapter.get_rest_url = lambda: f"{server.url}/api/v3/klines"
feed._adapter.get_ws_url = lambda: f"ws://{server.host}:{server.port}/ws"
try:
# First, get data with normal conditions
logger.info("Fetching data with normal network conditions...")
await feed.fetch_candles()
normal_candles = len(feed.get_candles())
logger.info(f"Received {normal_candles} candles")
# Now simulate difficult network conditions
logger.info("\nSetting difficult network conditions...")
server.set_network_conditions(
latency_ms=500, # 500ms latency
packet_loss_rate=0.3, # 30% packet loss
error_rate=0.3 # 30% error responses
)
# Try to fetch data under these conditions
logger.info("Attempting to fetch data with difficult conditions...")
success_count = 0
error_count = 0
for i in range(5):
try:
logger.info(f"Attempt {i+1}...")
await feed.fetch_candles()
success_count += 1
logger.info("Success!")
except Exception as e:
error_count += 1
logger.warning(f"Failed: {str(e)}")
await asyncio.sleep(1)
logger.info(f"\nResults: {success_count} successes, {error_count} failures")
# Reset to normal conditions
logger.info("\nResetting to normal network conditions...")
server.set_network_conditions(
latency_ms=0,
packet_loss_rate=0.0,
error_rate=0.0
)
# Try one more fetch
logger.info("Fetching data with normal conditions again...")
await feed.fetch_candles()
new_count = len(feed.get_candles())
logger.info(f"Received {new_count} candles")
finally:
# Clean up
await feed.stop()
await server.stop()
logger.info("Test completed")
if __name__ == "__main__":
asyncio.run(main())
Testing Rate Limiting¶
This example shows how to test rate limiting behavior:
import asyncio
import logging
import time
from candles_feed.testing_resources.mocked_candle_feed_server import MockedCandlesFeedServer
from candles_feed.testing_resources.mocks.core.exchange_type import ExchangeType
import aiohttp
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
"""Example of testing rate limiting."""
# Create and start a mock server
server = MockedCandlesFeedServer(
exchange_type=ExchangeType.BINANCE_SPOT,
host="127.0.0.1",
port=8080
)
await server.start()
logger.info(f"Mock server started at {server.url}")
# Set custom rate limits
server.set_rate_limits(
rest_limit=5, # Only 5 requests
rest_period_ms=5000, # Per 5 seconds
ws_limit=2, # 2 messages
ws_burst=3 # Burst of 3 allowed
)
try:
# Create an HTTP session for testing
async with aiohttp.ClientSession() as session:
# Make requests in a loop to hit the rate limit
logger.info("Making rapid requests to test rate limiting...")
success_count = 0
rate_limited_count = 0
# Try to make 10 requests quickly
for i in range(10):
try:
start_time = time.time()
# Make a request to the klines endpoint
url = f"{server.url}/api/v3/klines?symbol=BTCUSDT&interval=1m&limit=10"
async with session.get(url) as response:
elapsed = time.time() - start_time
if response.status == 200:
success_count += 1
logger.info(f"Request {i+1}: Success ({elapsed:.2f}s)")
elif response.status == 429: # Too Many Requests
rate_limited_count += 1
logger.warning(f"Request {i+1}: Rate limited! ({elapsed:.2f}s)")
retry_after = response.headers.get('Retry-After', 'unknown')
logger.warning(f"Retry-After: {retry_after}s")
else:
logger.error(f"Request {i+1}: Unexpected status {response.status}")
except Exception as e:
logger.error(f"Request {i+1}: Error - {str(e)}")
# Small delay to make the output readable
await asyncio.sleep(0.2)
logger.info(f"\nRate limit test results:")
logger.info(f"Successful requests: {success_count}")
logger.info(f"Rate limited requests: {rate_limited_count}")
# Wait for rate limit to reset
logger.info("\nWaiting for rate limit to reset...")
await asyncio.sleep(5)
# Try one more request
logger.info("Making one more request after waiting...")
async with session.get(f"{server.url}/api/v3/klines?symbol=BTCUSDT&interval=1m&limit=10") as response:
if response.status == 200:
logger.info("Success! Rate limit has reset.")
else:
logger.warning(f"Still rate limited: {response.status}")
finally:
# Clean up
await server.stop()
logger.info("Test completed")
if __name__ == "__main__":
asyncio.run(main())
See Also¶
For more information about the mock server functionality, refer to: